# This notebook automates the deployment of Microsoft Fabric data pipelines in another Fabric tenant
----------------------------------------------------------------------------------------------------------------
- **FabricDataPipelineAutoDeployment**
- **Version V1.0**

- **PREREQUISITES**
- Please ensure that the **JSON files & YAML files** are kept at the lakehouse folder.
- Ensure the YAML configuration values are updated.
----------------------------------------------------------------------------------------------------------------


# Install jsonpath_ng library
- jsonpath_ng library provides functionality for querying JSON-like data structures using JSONPath expressions. 

In [None]:
%pip install jsonpath_ng

# Import statements for necessary libraries:

- `requests`: Used for making HTTP requests.
- `json`: Provides functions for encoding and decoding JSON data.
- `base64`: Used for encoding and decoding data in Base64 format.
- `sempy` library and its `fabric` module for working with Fabric in Sempy.
- `pyyaml` library is used for YAML parsing and emitter for Python.

In [None]:
# import libraries
import requests
import json
import base64
import sempy
import sempy.fabric as fabric
import yaml

# Get current workspace id & session token

In [None]:
workspace_id = fabric.get_workspace_id()
access_token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com/")

# Read and parse the configuration file

In [None]:
base_folder_path = "/lakehouse/default/Files/"  # modify based on your environment e.g. Files/ in Fabric
config_file_name = "pipeline_deployment_config.yml"
config_file_path = base_folder_path + config_file_name
config_data = None
externalreference_id_map = {}  # Create a map to store oldconnectionid and newconnectionid

# Get details of data warehouse name, id & endpoint from configuration file
dest_dw_name = ""
dest_dw_object_id = ""
dest_dw_endpoint = ""

# Get details of lakehouse name, id & endpoint from configuration file
dest_lh_name = ""
dest_lh_object_id = ""
dest_lh_endpoint = ""

# Read the config file
try:
    with open(config_file_path, 'r') as file:
        config_data = yaml.safe_load(file)
        # Debug: Print the type and content of config_data
        # print(f"Loaded config data (type: {type(config_data)}): {config_data}")
except yaml.YAMLError as exc:
    print(f"Error reading YAML file: {exc}")
except FileNotFoundError as fnf_error:
    print(f"Config file not found: {fnf_error}")

# Process the config data if available
if isinstance(config_data, dict):
    # Process pipeline configuration files
    pipeline_config_files = config_data.get("pipelinejsonfiles", [])

    # Read original pipeline reference IDs
    original_pipeline_ids = {}
    for item in config_data.get('originalpipelinereferenceids', []):
        display_name = item.get('pipelinedisplayname')
        pipeline_id = item.get('originalpipelineid')
        original_pipeline_ids[display_name] = pipeline_id

    # Process Destination Datawarehouse details
    datawarehouse_details = config_data.get("DestinationDatawarehousedetails", [])
    for item in datawarehouse_details:
        dest_dw_name = item.get("name")
        dest_dw_object_id = item.get("objectId")
        dest_dw_endpoint = item.get("endpoint")

    # Process Destination Lakehouse details
    lakehouse_details = config_data.get("DestinationLakehousedetails", [])
    for item in lakehouse_details:
        dest_lh_name = item.get("name")
        dest_lh_object_id = item.get("objectId")
        dest_lh_endpoint = item.get("endpoint")
    for pipeline in pipeline_config_files:
        external_refs = pipeline.get('externalreferences')
        if external_refs:  # Check if external references exist
            for ref in external_refs:
                old_id = ref.get('oldconnectionid')
                new_id = ref.get('newconnectionid')
                if old_id and new_id:  # Ensure both IDs are present
                    externalreference_id_map[old_id] = new_id
else:
    print("No valid configuration data found.")

# Function to encode input string to base64

In [None]:
def b64EncodeString(json_str):
    # Encode the string using Base64
    base64_encoded = base64.b64encode(json_str.encode('utf-8'))
    base64_encoded_str = base64_encoded.decode('utf-8')
    return base64_encoded_str

# Read JSON file from Lakehouse location

In [None]:
def readJsonFile(file_path):
    try:
        with open(file_path, 'r') as file:
            json_data = json.load(file)
            # Remove 'name' and 'objectId' properties
            if 'name' in json_data:
                del json_data['name']
            if 'objectId' in json_data:
                del json_data['objectId']
            return json_data
    except FileNotFoundError:
        print(f"File '{file_path}' not found.")
    except json.JSONDecodeError:
        print(f"Error decoding JSON in file '{file_path}'.")

# Read & Replace Embedded PipelineIDs Json

In [None]:
def readandReplaceEmbeddedPipelineIDsJson(filepath,invoked_pipeline_list,successfully_created_pipelines):
    try:
        with open(file_path, 'r') as file:
            json_data = json.load(file)
            for invoked_pipeline in invoked_pipeline_list:
                new_pipeline_id = successfully_created_pipelines.get(invoked_pipeline)
                original_pipeline_id = original_pipeline_ids.get(invoked_pipeline)
                json_data = replace_property(json_data,original_pipeline_id,new_pipeline_id)
            return json_data
    except FileNotFoundError:
        print(f"File '{file_path}' not found.")
    except json.JSONDecodeError:
        print(f"Error decoding JSON in file '{file_path}'.")

# Replace individual property.
- Sub function used by readandReplaceEmbeddedPipelineIDsJson

In [None]:
def replace_property(json_obj, old_value, new_value):
    if isinstance(json_obj, dict):
        for key, value in json_obj.items():
            if isinstance(value, (dict, list)):
                replace_property(value, old_value, new_value)
            elif value == old_value:
                json_obj[key] = new_value
    elif isinstance(json_obj, list):
        for item in json_obj:
            replace_property(item, old_value, new_value)
    return json_obj

# Replace external references connection value

In [None]:
def replace_externalreferences_connection_value(json_data, old_value, new_value):
    try:
        json_str = json.dumps(json_data)
        updated_json_str = json_str.replace(old_value, new_value)
        return json.loads(updated_json_str)
    except Exception as e:
        print(f"Replace error for external references {e}")

# Replace linked service details

In [None]:
from jsonpath_ng import parse 
def replace_linked_service_details(json_data):
    jsonpath_expr = parse('$..linkedService')
    matches = jsonpath_expr.find(json_data)
    
    if matches:
        for match in matches:
            linked_service = match.value
            if(linked_service['properties']['type'] == 'DataWarehouse'):
                linked_service['name'] = dest_dw_name
                linked_service['objectId'] = dest_dw_object_id
                linked_service['properties']['typeProperties']['artifactId'] = dest_dw_object_id
                linked_service['properties']['typeProperties']['endpoint'] = dest_dw_endpoint
            elif(linked_service['properties']['type'] == 'Lakehouse'):
                linked_service['name'] = dest_lh_name
                linked_service['properties']['typeProperties']['artifactId'] = dest_lh_object_id
                linked_service['properties']['typeProperties']['workspaceId'] = workspace_id
    else:
        print(f"No matches found")
    return json_data

# Deploy Fabric Data Pipelines

In [None]:
url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items"

successfully_created_pipelines = dict()

for item in pipeline_config_files:
  display_name = item.get("pipelinedisplayname")
  file_path = item.get("pipelinejsonfilepath")
  invoke_pipeline = item.get("invokepipeline")
  externalreferencesflg = item.get("externalreferencesflg")
  internalreferencesflg = item.get("internalreferencesflg")
  invoked_pipeline_list = item.get("invokedpipelinelist", []) if invoke_pipeline == "true" else None 

  if invoke_pipeline == "false":
    json_file_value =  readJsonFile(file_path)
  elif invoke_pipeline == "true":
    json_file_value =  readandReplaceEmbeddedPipelineIDsJson(file_path,invoked_pipeline_list,successfully_created_pipelines)

  if internalreferencesflg == "true":
    json_file_value = replace_linked_service_details(json_file_value)

  if externalreferencesflg == "true":
    for key, value in externalreference_id_map.items():
      json_file_value = replace_externalreferences_connection_value (json_file_value,key,value)

  # Convert JSON object to string
  json_str = json.dumps(json_file_value)

  base64_encoded_str = b64EncodeString(json_str)

  payload = json.dumps({
    "displayName": display_name,
    "type": "DataPipeline",
    "definition": {
      "parts": [
        {
          "path": "pipeline-content.json",
          "payload": base64_encoded_str,
          "payloadType": "InlineBase64"
        }
      ]
    }
  })
  headers = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {access_token}'
  }
  response = requests.request("POST", url, headers=headers, data=payload)
  #print(response.text)
  response_dict = json.loads(response.text)

  if response.ok:
    print(f">> Pipeline '{display_name}' deployed.")
  else:
    raise RuntimeError(f"Pipeline '{display_name}' creation failed: {response.status_code}: {response.text}")

  if "id" in response_dict:
    successfully_created_pipelines[response_dict['displayName']]= response_dict['id']