# Fabric CLI and Deployment Pipelines
This notebook provides an example of how to set up a deployment pipeline using the Fabric CLI with Python.
The workflow is based on the FMD Framework (https://github.com/edkreuk/FMD_FRAMEWORK), which offers more examples and detailed guidance on deploying your own data estate with the CLI.


In [None]:
# Also make sure the notebook uses Python and not PySpark
%pip install ms-fabric-cli --quiet

In [None]:
import msal 
import os
import subprocess
import json
from time import sleep, time
import zipfile
import requests
from io import BytesIO
import re

# Parameters

In [None]:
# Do not change the below params, some logic/code is depeneded on them. Or change them and also adjust the code such that it works again :)
resource_dir = "./builtin/src"
dev_suffix = " (D)"
prod_suffix = " (P)"
empty_guid = "00000000-0000-0000-0000-000000000000"
mapping_table = []
tasks = []

# These are service principal params, no need to use them if you authenticate with a user
SERVICE_PRINCIPAL_LOGIN = False                                                                 # Set to True if you want to use your own user account instead of the SP
CLIENT_SECRET = "0000~000000000000000_andprobalysometextidk",                                   # Fill in the secret value for your SP
CLIENT_ID = "00000000-0000-0000-0000-00000a0a0a00"                                              # The service principal (app registration) which can be found in Entra
TENANT_ID = "00ab00a0-00a0-0000-aaaa-0a0a0a0a0a0a"                                              # The tenant id (can also be found in Entra)

# Change these as you see fit
# !!!!!!!!!!! A capacity name is required if you want to actually deploy stuff. Use the free trial capacity, if you still can :) !!!!!!!!!!!!!!!!
CAPACITY_NAME = "Trial-20250602T071858Z-0a0a0a0a0a0a0a0a0a0a0a"                                # This should be the actual capacity name which you use, otherwise the code won't work
FRAMEWORK_NAME = 'DPF'                                                                         # max 6 characters for better visibility, no spaces or weird !@#$^%^ stuff
DEFAULT_ITEM_DESCRIPTION = "This item is generated by the Deployment Pipeline Fabric notebook."            # Fill in w/e just know everyone with acces will see this ;)
DEFAULT_WORKSPACE_DESCRIPTION = "The items are created by the Deployment Pipeline Fabric notebook."        # Fill in w/e just know everyone with acces will see this ;)
ENVIRONMENT_NAME = "development"                                                                # Items will only be deployed to this environment


workspace_roles = [ # Keep emtpy [] if you only want to assign the current user)
                    {
                        "principal": {
                            "id": '00000000-0000-0000-0000-000000000000',
                            "displayName": "Keep it safe", # Name of the group or user to assign the role to
                            "type": "Group"
                        },
                        "role": "admin"  #(choose from 'admin', 'member', 'contributor', 'viewer')
                    },
                    {
                        "principal": {
                            "id": '00000000-0000-0000-0000-000000000000',
                            "displayName": "ThisIs MyName", # Name of the group or user to assign the role to
                            "type": "User"
                        },
                        "role": "admin"  #(choose from 'admin', 'member', 'contributor', 'viewer')
                    },
                ]

template_deployment_pipeline = {
    "displayName": "{workspace_name} Deployment Pipeline",
    "description": "This deployment pipeline was created by Deployment Pipeline Fabric script.",
    "stages": [
        {
            "displayName": "development", # If you change this, also change the environment name such that they match
            "description": "development stage",
            "isPublic": False
        },
        {
            "displayName": "production",
            "description": "production stage",
            "isPublic": True
        }
    ]
}

environments = [
    {
        "environment_name": ENVIRONMENT_NAME,
        "workspaces": {
            "code": {
                "name": FRAMEWORK_NAME + " Code" + dev_suffix, 
                "roles": workspace_roles,
                "capacity_name": CAPACITY_NAME,
                "workspace_id": empty_guid,
                "deployment_pipeline": True # Set both to True or False, otherwise the different stages won't get added to the deployment pipeline.
            },
            "data": {
                "name": FRAMEWORK_NAME + " Data" + dev_suffix, 
                "roles": workspace_roles,
                "capacity_name": "none",
                "workspace_id": empty_guid,
                "deployment_pipeline": False
            },
        },
    },
    {
        "environment_name": "production",
        "workspaces": {
            "code": {
                "name": FRAMEWORK_NAME + " Code" + prod_suffix,
                "roles": workspace_roles,
                "capacity_name": CAPACITY_NAME,
                "workspace_id": empty_guid,
                "deployment_pipeline": True
            },
            "data": {
                "name": FRAMEWORK_NAME + " Data" + prod_suffix,
                "roles": workspace_roles,
                "capacity_name": "none",
                "workspace_id": empty_guid,
                "deployment_pipeline": False
            },
        },
    },
]

# CLI Login

In [None]:
# If you want to use the service principal account to deploy all your resources run this. Make sure you filled in all the required parameters.
if SERVICE_PRINCIPAL_LOGIN:
    def get_fabric_service_principal_token(client_id, client_secret, tenant_id):
        app = msal.ConfidentialClientApplication(
            client_id=client_id,
            client_credential=client_secret,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
        fabric_scope = "https://analysis.windows.net/powerbi/api/.default"
        result = app.acquire_token_for_client(scopes=[fabric_scope])
        if "access_token" in result:
            return result["access_token"]
        else:
            print(f"Error: {result.get('error')}")
            print(f"Description: {result.get('error_description')}")
            return None


    fabric_token = get_fabric_service_principal_token(CLIENT_ID, CLIENT_SECRET, TENANT_ID)

    if fabric_token:
        print("Successfully obtained Fabric token")
    else:
        print("Failed to obtain token")

    # Set environment so subprocess can acces the token
    os.environ['FAB_TOKEN'] = fabric_token
    os.environ['FAB_TOKEN_ONELAKE'] = fabric_token
    os.environ['FAB_TOKEN_AZURE'] = fabric_token

# Otherwise just use your current user
else: 
    # Set environment parameters for Fabric CLI
    token = notebookutils.credentials.getToken('pbi')
    os.environ['FAB_TOKEN'] = token
    os.environ['FAB_TOKEN_ONELAKE'] = token

# File download
#### We use the original FMD repository as a source to download files that can be imported into a workspace and later deployed through the pipeline.

In [None]:
##### DO NOT CHANGE UNLESS SPECIFIED OTHERWISE ####
repo_owner = "edkreuk"              # Owner of the repository
repo_name = "FMD_FRAMEWORK"         # Name of the repository
branch = "main"                     #"main" is default                    
folder_prefix = ""
###################################################

In [None]:
def download_folder_as_zip(repo_owner, repo_name, output_zip, branch="main", folder_to_extract="src",  remove_folder_prefix = ""):
    # Construct the URL for the GitHub API to download the repository as a zip file
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/zipball/{branch}"
    
    # Make a request to the GitHub API
    response = requests.get(url)
    response.raise_for_status()

    folder_to_extract = f"/{folder_to_extract}" if folder_to_extract[0] != "/" else folder_to_extract
    
    # Ensure the directory for the output zip file exists
    os.makedirs(os.path.dirname(output_zip), exist_ok=True)
    
    # Create a zip file in memory
    with zipfile.ZipFile(BytesIO(response.content)) as zipf:
        with zipfile.ZipFile(output_zip, 'w') as output_zipf:
            file_count = 0
            for file_info in zipf.infolist():
                if file_count > 20: # Added a file count because notebook storage cannot exceed a certain limit
                    break

                parts = file_info.filename.split('/')
                if  re.sub(r'^.*?/', '/', file_info.filename).startswith(folder_to_extract): 
                    # Extract only the specified folder
                    file_data = zipf.read(file_info.filename)  
                    if folder_prefix != "":
                        parts.remove(remove_folder_prefix)
                    output_zipf.writestr(('/'.join(parts[1:])), file_data)
                    file_count += 1

def uncompress_zip_to_folder(zip_path, extract_to):
    # Ensure the directory for extraction exists
    os.makedirs(extract_to, exist_ok=True)
    
    # Uncompress all files from the zip into the specified folder
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    
    # Delete the original zip file
    os.remove(zip_path)

download_folder_as_zip(repo_owner, repo_name, output_zip = "./builtin/src/src.zip", branch = branch, folder_to_extract= f"{folder_prefix}/src", remove_folder_prefix = f"{folder_prefix}")
uncompress_zip_to_folder(zip_path = "./builtin/src/src.zip", extract_to= "./builtin")
 

# CLI Functions

In [None]:
# -------------------------------
# FABRIC CLI Utilities
# -------------------------------

def run_fab_command(command, capture_output=False, silently_continue=False, raw_output=False):
    """
    Executes a Fabric CLI command with optional output capture and error handling.
    """
    result = subprocess.run(["fab", "-c", command], capture_output=capture_output, text=True)
    if not silently_continue and (result.returncode > 0 or result.stderr):
        raise Exception(f"Error running fab command. exit_code: '{result.returncode}'; stderr: '{result}'")
    if capture_output:
        return result if raw_output else result.stdout.strip()
    return None


# -------------------------------
# Workspace Management
# -------------------------------

def get_workspace_id_by_name(workspace_name):
    """
    Retrieves the workspace ID by its display name.
    """
    result = run_fab_command("api -X get workspaces/", capture_output=True, silently_continue=True)
    workspaces = json.loads(result)["text"]["value"]
    normalized_name = workspace_name.strip().lower()
    match = next((w for w in workspaces if w['displayName'].strip().lower() == normalized_name), None)
    return match['id'] if match else None


def ensure_workspace_exists(workspace):
    """
    Ensures the workspace exists; creates it if not found.
    """
    workspace_name = workspace['name']
    workspace_id = get_workspace_id_by_name(workspace_name)
    if workspace_id:
        print(f" - Workspace '{workspace_name}' found. Workspace ID: {workspace_id}- assign capacity: {workspace['capacity_name']}")
        run_fab_command(f'assign ".capacities/{workspace["capacity_name"]}.Capacity" -W "{workspace_name}.Workspace" -f', silently_continue=True)
        return workspace_id, "exists"

    print(f" - Workspace '{workspace_name}' not found. Creating new workspace...")
    run_fab_command(f'mkdir "{workspace_name}.workspace" -P capacityName="{workspace["capacity_name"]}"', silently_continue=True)
    workspace_id = get_workspace_id_by_name(workspace_name)
    if workspace_id:
        print(f" - Created workspace '{workspace_name}'. ID: {workspace_id}")
        return workspace_id, "created"
    else:
        raise RuntimeError(f"Workspace '{workspace_name}' could not be created or found.")


# -------------------------------
# Item Utilities
# -------------------------------

def fab_get_id(workspace_name, name):
    """
    Retrieves the item ID from a workspace.
    """
    return run_fab_command(f"get /{workspace_name}.Workspace/{name} -q id", capture_output=True, silently_continue=True)


def fab_get_display_name(workspace_name, name):
    """
    Retrieves the display name of an item.
    """
    return run_fab_command(f"get /{workspace_name}.Workspace/{name} -q displayName", capture_output=True, silently_continue=True)


def fab_get_items(workspace_id, item_id=''):
    """
    Retrieves item definitions or lists from a workspace.
    """
    if item_id:
        return run_fab_command(f"api -X post workspaces/{workspace_id}/items/{item_id}/getDefinition", capture_output=True, silently_continue=True)
    return run_fab_command(f"api -X get workspaces/{workspace_id}/items/{item_id}", capture_output=True, silently_continue=True)


# -------------------------------
# Description and Identity Assignment
# -------------------------------

def assign_workspace_description(workspace):
    """
    Assigns a standard description to the workspace.
    """
    workspace_name = workspace['name']
    payload = 'Important: The items in this workspace are automatically generated by the FMD Framework. Each time the setup notebook is executed, all changes will be overwritten. For more information, please visit https://github.com/edkreuk/FMD_FRAMEWORK.'
    run_fab_command(f'set "/{workspace_name}.workspace -q description -i {payload} -f', silently_continue=True)
    print(f" - Description applied to '{workspace_name}'")


def assign_item_description(workspace, item):
    """
    Assigns a standard description to an item.
    """
    workspace_name = workspace['name']
    payload = 'Note: This item was initially generated by the FMD Framework. Any modifications may introduce breaking changes. For further details, please refer to the documentation at https://github.com/edkreuk/FMD_FRAMEWORK.'
    run_fab_command(f'set "/{workspace_name}.workspace/{item} -q description -i {payload} -f', silently_continue=True)
    print(f" - Description applied to {item} in '{workspace_name}'")


def assign_roles(workspace):
    """
    Assigns roles to principals in the workspace.
    """
    workspace_path = f"/{workspace['name']}.workspace"
    print(f" - Assigning Workspace roles")
    for role in workspace['roles']:
        try:
            print(f"Assigning role '{role['role']}' to '{role['principal']['displayName']}' in workspace '{workspace['name']}'")
            run_fab_command(
                f'acl set "{workspace_path}" -I {role["principal"]["id"]} -R {role["role"]} -f',
                silently_continue=True
            )
        except Exception as e:
            print(f" - Failed to assign role: {e}")


# -------------------------------
# Deployment functions
# -------------------------------

def deploy_workspaces(workspace, environment_name, old_id, mapping_table, tasks):
    """
    Deploys a workspace by ensuring its existence, assigning identity, roles, and description.
    Updates the mapping table and logs the deployment task.

    Parameters:
    - workspace (dict): Workspace configuration including name and capacity.
    - environment_name (str): Target environment name.
    - old_id (str): Previous workspace ID to be replaced.
    - mapping_table (list): List to store ID mappings.
    - tasks (list): List to store task execution logs.
    """
    start = time()
    print("\n#############################################")
    print(f" - Processing: workspace {workspace['name']}")

    workspace_id, status = ensure_workspace_exists(workspace)
    workspace["id"] = workspace_id

    print("--------------------------")
    print(f"Updating Mapping Table: {environment_name}")
    mapping_table.append({
        "Description": workspace['name'],
        "environment": environment_name,
        "old_id": old_id,
        "new_id": workspace_id
    })
    mapping_table.append({
        "Description": workspace['name'],
        "environment": environment_name,
        "old_id": "00000000-0000-0000-0000-000000000000",
        "new_id": workspace_id
    })

    assign_roles(workspace)
    assign_workspace_description(workspace)

    tasks.append({
        "task_name": f"Update workspace {workspace['name']}",
        "task_duration": int(time() - start),
        "status": "success"
    })


def deploy_folder_to_workspace(workspace: dict, folder_path: str, environment_name: str,
                               mapping_table: list, lakehouse_schema_enabled: bool) -> None:
    """
    Deploy all items from a given folder into the target workspace.

    Args:
        workspace (dict): Workspace configuration.
        folder_path (str): Path to the folder containing items to deploy.
        environment_name (str): Target environment.
        mapping_table (list): Deployment mapping table.
        lakehouse_schema_enabled (bool): Whether to enable lakehouse schema creation.
    """
    if not os.path.isdir(folder_path):
        print(f"Folder not found: {folder_path}")
        return

    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        deploy_file_to_workspace(
            workspace=workspace,
            file_path=file_path,
            environment_name=environment_name,
            mapping_table=mapping_table,
            lakehouse_schema_enabled=lakehouse_schema_enabled,
        )

def deploy_file_to_workspace(workspace: dict, file_path: str, environment_name: str,
                             mapping_table: list, lakehouse_schema_enabled: bool) -> None:
    """
    Deploy a single file into the given workspace.
    """
    workspace_name = workspace["name"]
    file_name = os.path.basename(file_path)
    item_name, item_extension = os.path.splitext(file_name)

    print("\n#############################################")
    print(f"Deploying in {workspace_name}: {item_name}")

    cli_parameter = ""

    if "Notebook" in item_extension:
        cli_parameter += " --format .py"
        result = run_fab_command(
            f'import / {workspace_name}.Workspace/{item_name}.Notebook -i "{file_path}" -f {cli_parameter}',
            capture_output=True, silently_continue=True, raw_output=False
        )
        assign_item_description(workspace, item_name)
        new_id = fab_get_id(workspace_name, item_name)

    elif "Lakehouse" in item_extension:
        param = "-P enableschemas=true" if lakehouse_schema_enabled else "-P"
        result = run_fab_command(
            f'create {workspace_name}.Workspace/{item_name} {param}',
            capture_output=True, silently_continue=True, raw_output=False
        )
        assign_item_description(workspace, item_name)
        new_id = fab_get_id(workspace_name, item_name)

    elif "DataPipeline" in item_extension:
        result = run_fab_command(
            f'import {workspace_name}.Workspace/{item_name}.DataPipeline -i "{file_path}" -f {cli_parameter}',
            capture_output=True, silently_continue=True, raw_output=False
        )
        assign_item_description(workspace, item_name)
        new_id = fab_get_id(workspace_name, item_name)

    elif "VariableLibrary" in item_extension:
        print(f"Creating {workspace_name}: {item_name}")
        result = run_fab_command(
            f'import / {workspace_name}.Workspace/{item_name}.VariableLibrary -i "{file_path}" -f',
            capture_output=True, silently_continue=True, raw_output=False
        )
        new_id = fab_get_id(workspace_name, item_name)

    else:
        print(f" Skipping {item_name} (unknown type)")
        return

    print(result)

    mapping_table.append({
        "Description": item_name,
        "environment": environment_name,
        "old_id": "N/A",
        "new_id": new_id,
    })

In [None]:
def create_deployment_pipeline(workspace, environment_name, deployment_pipeline_template):
    if workspace["deployment_pipeline"]:
        workspace_id, status = ensure_workspace_exists(workspace)
        workspace_name_regex = workspace["name"] # regex to 'ensure' same workspaces get assigned to the same deployment pipeline
        result_regex = re.sub(r'\s*\(.*$', "", workspace_name_regex)
        
        print("\n#############################################")
        print(f" - Creating deployment pipeline: {result_regex}")
        
        pipeline_id = get_existing_pipeline(result_regex)
        
        if pipeline_id:
            print(f" - Pipeline already exists: {result_regex} (ID: {pipeline_id})")
        else:
            print(f" - Creating new pipeline: {result_regex}")
            template = deployment_pipeline_template.copy()
            template["displayName"] = template_deployment_pipeline["displayName"].replace("{workspace_name}", result_regex)

            payload = json.dumps(template)
            response = run_fab_command(f"api -X post deploymentPipelines  -i {payload}", capture_output=True, silently_continue=True)
            response_dict = json.loads(response)
            pipeline_id = response_dict["text"]["id"]
        
        assign_user_to_deployment_pipeline(workspace, pipeline_id)
        assign_workspace_to_pipeline(pipeline_id, workspace_id, stage_name=environment_name)
        deploy_pipeline_artifacts(pipeline_id)
        print("---------------------------------------------\n")


def get_existing_pipeline(workspace_name: str) -> str | None:
    """Get the ID of an existing pipeline matching the workspace name."""
    try:
        response = run_fab_command("api deploymentPipelines", capture_output=True, silently_continue=True)
        pipelines = json.loads(response).get("text", {}).get("value", [])
        
        for pipeline in pipelines:
            if workspace_name in pipeline.get("displayName", ""):
                return pipeline["id"]
                
    except Exception as e:
        print(e)
    
    return None


def assign_user_to_deployment_pipeline(workspace, pipeline_id):
    for role in workspace['roles']:
        try:
            # role['principal'].pop('displayName', None)
            print(f" - Assigning role '{role['role']}' to '{role['principal']['displayName']}' to pipeline '{pipeline_id}'")
            role = json.dumps(role)
            response = run_fab_command(f"api -X post deploymentPipelines/{pipeline_id}/roleAssignments -i {role}", capture_output=True, silently_continue=True)
            result = json.loads(response)
            if result.get("status_code") == 200:
                continue
            else:
                print(f'    ! {result.get("text", {}).get("message", "")} \n')
        except Exception as e:
            print(f"    - Failed to assign role: {e}")


def assign_workspace_to_pipeline(pipeline_id, workspace_id, stage_name):
    """Assign a workspace to a pipeline. Returns None"""
    pipeline_stages = get_pipeline_stages(pipeline_id)
    # Find stage id by displayName
    stage_id = next(
        (s["id"] for s in pipeline_stages if s.get("displayName", "").lower() == stage_name.lower()),
        None
    )
    if not stage_id:
        raise ValueError(f"Stage '{stage_name}' not found in pipeline {pipeline_id}")
    payload = {"workspaceId": workspace_id}
    payload_str = json.dumps(payload)
    response = run_fab_command(f"api -X post deploymentPipelines/{pipeline_id}/stages/{stage_id}/assignWorkspace -i {payload_str}", capture_output=True, silently_continue=True)
    result = json.loads(response)
    if result.get("status_code") == 200:
        print(f" - Assigned workspace (ID: {workspace_id}) to the pipeline stage (ID: {stage_id})")
    else:
        print(f' ! Assigning workspace error: {result.get("text", {}).get("message", "")}')


def get_pipeline_stages(pipeline_id):
    """Check all pipelines stages for a specific pipeline and returns all stages."""
    response = run_fab_command(f"api deploymentPipelines/{pipeline_id}/stages", capture_output=True, silently_continue=True)
    pipeline_stages = json.loads(response).get("text", {}).get("value", [])
    return pipeline_stages


def pipeline_is_running(pipeline_id):
    """Check if there is an ongoing deployment for this pipeline and returns the most recent status."""
    response = run_fab_command(f"api deploymentPipelines/{pipeline_id}/operations", capture_output=True, silently_continue=True)
    operations = json.loads(response).get("text", {}).get("value", [])
    latest_op = max(operations, key=lambda op: op.get("lastUpdatedTime", ""), default=None)
    status = latest_op.get("status") if latest_op else None 
    return status 


def deploy_pipeline_artifacts(pipeline_id):
    pipeline_stages = get_pipeline_stages(pipeline_id)
    # Check if any stage is missing a workspaceId
    if any("workspaceId" not in stage for stage in pipeline_stages):
        print(" - Not all stages have a workspace assigned to them, continuing deployment...")
        return
    
    status = pipeline_is_running(pipeline_id)
    if status in ("Running", "NotStarted"):
        print(f" - Pipeline {pipeline_id} is currently {status}. Skipping deployment.")
        return
    
    stages_to_deploy = [{"id": s["id"], "name": s["displayName"], "order": s["order"]} for s in pipeline_stages]
    for i in range(len(stages_to_deploy) - 1):
        source = stages_to_deploy[i]
        target = stages_to_deploy[i+1]

        payload = {
            "sourceStageId": source["id"],
            "targetStageId": target["id"],
            "note": "Deployed via DoD platform"
        }
        payload_str = json.dumps(payload)

        print(f" - Deploying all artifacts from {source['name']} to {target['name']}")
        run_fab_command(f"api -X post deploymentPipelines/{pipeline_id}/deploy -i {payload_str}", capture_output=True, silently_continue=True)

# Deployment of workspaces, items and deployment pipeline

In [None]:
for environment in environments:
    print(f"--------------------------")
    print(f"Updating Workspace: {environment['environment_name']}")
    deploy_workspaces(workspace=environment['workspaces']['code'], environment_name=environment['environment_name'], old_id=empty_guid, mapping_table=mapping_table, tasks=tasks)
    deploy_workspaces(workspace=environment['workspaces']['data'], environment_name=environment['environment_name'], old_id=empty_guid, mapping_table=mapping_table, tasks=tasks)

In [None]:
for environment in environments:
    if environment["environment_name"].lower() != ENVIRONMENT_NAME:
        continue  # skip deployment for other environments because we will use the deployment pipeline to deploy the items.

    print("\n--------------------------")
    print(f"Processing environment: {environment['environment_name']}")

    workspace = environment["workspaces"]["code"]
    deploy_folder_to_workspace(
        workspace=workspace,
        folder_path=resource_dir,
        environment_name=environment["environment_name"],
        mapping_table=mapping_table,
        lakehouse_schema_enabled=False,
    )

In [None]:
for environment in environments:
    create_deployment_pipeline(workspace=environment['workspaces']['code'], environment_name=environment['environment_name'], deployment_pipeline_template=template_deployment_pipeline)
    create_deployment_pipeline(workspace=environment['workspaces']['data'], environment_name=environment['environment_name'], deployment_pipeline_template=template_deployment_pipeline)