# 🚀 Fabric Solution Accelerator Deployment Notebook

This notebook orchestrates the end-to-end deployment of **Advanced Metering Infrastructure (AMI)** solution assets into the current Microsoft Fabric workspace using the `fabric-cicd` library.

## This notebook performs the following tasks:
1. **📦 Package Installation**: Install required libraries and dependencies
1. **⚙️ Parameter Configuration and Library Import:** Configure parameters and import required libraries
1. **📥 Source Code Download**: Download and extracts solution content from GitHub repository
1. **🚀 Asset Deployment**: Deploy Fabric items and map them to each other to preserve dependencies


## 📦 Package Installation

In [1]:
%pip install fabric-cicd --quiet
%pip install --upgrade azure-core azure-identity --quiet

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [1]:
print("⚠️ Restarting Python kernel for installed packages to take effect")
notebookutils.session.restartPython()

⚠️ Restarting Python kernel for installed packages to take effect
sys.exit called with value 0. The interpreter will be restarted.


## ⚙️ Parameter Configuration and Library Import

Update these values to customize the deployment for your environment:

In [11]:
# Define user-configurable parameters
DEFAULT_API_ROOT_URL = "https://api.fabric.microsoft.com" #Default is https://api.fabric.microsoft.com, but may vary depending on your environment
DEBUG = False

In [10]:
# The following settings should not be modified by the user
# GitHub Repository Configuration
repo_owner = "slavatrofimov"
repo_name = "amisandbox"
branch = "main"
folder_to_extract = "workspace"

# Deployment Configuration
deployment_environment = "DEV"  # Options: DEV, TEST, PROD

# File System Paths
path_prefix = '.lakehouse/default/Files'

extract_to_directory = path_prefix + "/src"
repository_directory = extract_to_directory + "/" + folder_to_extract

In [8]:
### Import libraries
import subprocess
import os
import json
from zipfile import ZipFile 
import shutil
import re
import requests
import zipfile
from io import BytesIO
import yaml
import sempy.fabric as fabric
import base64
from pathlib import Path
from typing import Optional, Any
from datetime import datetime, timezone
from azure.core.credentials import TokenCredential, AccessToken
import fabric_cicd.constants
from fabric_cicd import FabricWorkspace, publish_all_items

## 📥 Source Code Download

Download and extract solution source files and configuration from the GitHub repository:

In [16]:
# Download and extract GitRepository to a folder

def download_and_extract_folder(repo_owner, repo_name, extract_to, branch="main", 
                               folder_to_extract="workspace", remove_folder_prefix=""):
    """
    Download a GitHub repository and extract a specific folder directly to disk
    without saving the zip file.
    
    Args:
        repo_owner: GitHub repository owner
        repo_name: GitHub repository name
        extract_to: Local directory to extract files to
        branch: Git branch to download (default: "main")
        folder_to_extract: Folder path within the repo to extract
        remove_folder_prefix: Prefix to remove from extracted file paths
    """
    try:
        # Construct the URL for the GitHub API to download the repository as a zip file
        url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/zipball/{branch}"
        
        # Make a request to the GitHub API
        response = requests.get(url)
        response.raise_for_status()

        # Delete target directory if exists
        if os.path.exists(extract_to) and os.path.isdir(extract_to):
            shutil.rmtree(extract_to)
            print(f'Deleted existing directory: {extract_to}')
        
        # Ensure the extraction directory exists
        os.makedirs(extract_to, exist_ok=True)
        
        # Process the zip file directly from memory
        with zipfile.ZipFile(BytesIO(response.content)) as zipf:
            for file_info in zipf.infolist():
                # Check if the file is in the folder we want to extract
                normalized_path = re.sub(r'^.*?/', '/', file_info.filename)
                
                if normalized_path.startswith(f"/{folder_to_extract}"):
                    # Calculate the output path
                    parts = file_info.filename.split('/')
                    relative_path = '/'.join(parts[1:])  # Remove repo root folder
                    
                    # Remove the specified prefix if provided
                    if remove_folder_prefix:
                        relative_path = relative_path.replace(remove_folder_prefix, "", 1)
                    
                    output_path = os.path.join(extract_to, relative_path)
                    
                    # Skip if it's a directory entry
                    if file_info.filename.endswith('/'):
                        os.makedirs(output_path, exist_ok=True)
                        continue
                    
                    # Ensure the directory for the file exists
                    os.makedirs(os.path.dirname(output_path), exist_ok=True)
                    
                    # Extract and write the file
                    with zipf.open(file_info) as source_file:
                        with open(output_path, 'wb') as target_file:
                            target_file.write(source_file.read())
                            
        print(f"Successfully extracted {folder_to_extract} from {repo_owner}/{repo_name} to {extract_to}")
        
    except Exception as e:
        error_msg = f"A {type(e).__name__} error occurred. This error may be intermittent. Consider stopping the current notebook session and re-running the notebook again."
        print(error_msg)
        #  Re-raise the exception 
        raise

# Execute repo download and extraction using configured parameters
print(f"📥 Downloading {repo_name} from {repo_owner}/{repo_name}:{branch}")
print(f"📁 Extracting '{folder_to_extract}' folder to '{extract_to_directory}'")

download_and_extract_folder(
    repo_owner=repo_owner,
    repo_name=repo_name,
    extract_to=extract_to_directory,
    branch=branch,
    folder_to_extract=folder_to_extract,
    remove_folder_prefix=""
)

print("✅ Source code download and extraction completed successfully")

📥 Downloading amisandbox from slavatrofimov/amisandbox:main
📁 Extracting 'workspace' folder to '.lakehouse/default/Files/src'
Successfully extracted workspace from slavatrofimov/amisandbox to .lakehouse/default/Files/src
✅ Source code download and extraction completed successfully


## 🚀 Fabric Asset Deployment

Deploy solution assets to the current Fabric workspace:

In [14]:
# Custom Token Credential class for authentication in a Fabric Notebook
# This class enables the fabric-cicd library to be run in a Fabric notebook

class FabricNotebookTokenCredential(TokenCredential):
    """Token credential for Fabric Notebooks using notebookutils authentication."""
    
    def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                  enable_cae: bool = False, **kwargs: Any) -> AccessToken:
        """Get access token from Fabric notebook environment."""
        access_token = notebookutils.credentials.getToken("pbi")       
        expiration = self._extract_jwt_expiration(access_token)
        return AccessToken(token=access_token, expires_on=expiration)
    
    def _extract_jwt_expiration(self, token: str) -> int:
        """Extract expiration timestamp from JWT token."""
        try:
            # Split JWT and get payload (middle part)
            payload_b64 = token.split(".")[1]           
            # Add padding if needed for base64 decoding
            payload_b64 += "=" * (-len(payload_b64) % 4)
            # Decode and parse payload
            payload_bytes = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
            payload = json.loads(payload_bytes.decode("utf-8"))
            # Extract expiration claim
            exp = payload.get("exp")
            if exp is None:
                raise ValueError("JWT missing expiration claim")
            return exp
        except (IndexError, json.JSONDecodeError, ValueError) as e:
            raise ValueError(f"Invalid JWT token format: {e}")

In [17]:
# Configure constants for fabric-cicd library
fabric_cicd.constants.DEFAULT_API_ROOT_URL = DEFAULT_API_ROOT_URL

# Get current workspace information
client = fabric.FabricRestClient()
workspace_id = fabric.get_workspace_id()
print(f"Target workspace ID: {workspace_id}")

# Enable debugging for more verbose execution logging
if DEBUG:
    from fabric_cicd import change_log_level
    change_log_level("DEBUG")

# Function to execute deployment of all specified item types
def deploy_artifacts(target_workspace):
    print("🚀 Starting deployment of Fabric items...")
    print(f"📋 Item types in scope: {', '.join(target_workspace.item_type_in_scope)}")
    publish_all_items(target_workspace)
    print("✅ Deployment completed successfully!")

# Initialize the FabricWorkspace object with configured parameters
target_workspace = FabricWorkspace(
    workspace_id=workspace_id,
    environment=deployment_environment,
    repository_directory=repository_directory,
    #item_type_in_scope=item_types_in_scope,
    token_credential=FabricNotebookTokenCredential()
)

Target workspace ID: 670345c3-e916-40a8-b57d-67e6c901cd2a

[32m[1m####################################################################################################[0m
[32m[1m########## Validating Parameter File ###############################################################[0m
[32m[1m####################################################################################################[0m



In [20]:
# Start by deploying data stores
item_types_in_scope = [
    "Eventhouse", 
    "KQLDatabase", 
    "Lakehouse"
]

target_workspace.item_type_in_scope=item_types_in_scope

# Execute deployment of all specified item types
deploy_artifacts(target_workspace)

In [12]:
# Deploy remaining items
item_types_in_scope = [
    "Eventstream", 
    "Notebook", 
    "KQLDashboard", 
    "SemanticModel", 
    "Report", 
    "Reflex", 
    "DataAgent"
]

target_workspace.item_type_in_scope=item_types_in_scope

# Execute deployment of all specified item types
deploy_artifacts(target_workspace)


[32m[1m####################################################################################################[0m
[32m[1m########## Publishing Workspace Folders ############################################################[0m
[32m[1m####################################################################################################[0m


[32m[1m####################################################################################################[0m
[32m[1m########## Publishing Data Agents ##################################################################[0m
[32m[1m####################################################################################################[0m

✅ Deployment completed successfully!


[37m[1m[info]   18:53:15 - Publishing SemanticModel 'Tag - Time Series Analysis'[0m
Publishing SemanticModel 'Tag - Time Series Analysis'
         18:53:17 - Operation in progress. Checking again in 1 second (Attempt 1)...[0m
->->Operation in progress. Checking again in 1 second (Attempt 1)...
         18:53:18 - Operation in progress. Checking again in 2 seconds (Attempt 2)...[0m
->->Operation in progress. Checking again in 2 seconds (Attempt 2)...
         18:53:20 - Operation in progress. Checking again in 4 seconds (Attempt 3)...[0m
->->Operation in progress. Checking again in 4 seconds (Attempt 3)...
         18:53:24 - Published[0m
->Published
         18:53:25 - API is throttled. Checking again in 20 seconds (Attempt 1)...[0m
->API is throttled. Checking again in 20 seconds (Attempt 1)...
         18:53:45 - API is throttled. Checking again in 2 seconds (Attempt 2)...[0m
->API is throttled. Checking again in 2 seconds (Attempt 2)...
[37m[1m[info]   18:53:51 - Publishi

## ✅ Post-Deployment Tasks

### Inspect Deployed Resources
Verify the deployment was successful by examining deployed assets:

In [13]:
'''
import ipywidgets as widgets
from IPython.display import display

def input_value(description = '', placeholder = ''):
    # Create a text input widget
    textbox_widget = widgets.Text(
        value='',  # Initial value
        placeholder=placeholder,
        description=description,
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='600px')
    )

    # Create a button to capture the value
    capture_button = widgets.Button(
        description='Submit',
        button_style='success',
        tooltip='Click to submit the value',
        icon='check'
    )

    # Variable to store the captured value
    captured_value = None

    # Function to handle button click
    def on_capture_click(b):
        global captured_value
        captured_value = textbox_widget.value
        print(f"✅ Captured value: '{captured_value}'")
        
        # Optional: Disable the widget after capture
        textbox_widget.disabled = True
        capture_button.description = 'Captured!'
        capture_button.icon = 'check-circle'

    # Attach the function to the button
    capture_button.on_click(on_capture_click)

    # Display the widgets
    display(widgets.VBox([
        textbox_widget,
        capture_button
    ]))

print("📝 Enter the value above and click the 'Submit' button to store it in the 'captured_workspace_name' variable.")
input_value(description = 'my description', placeholder = 'sfdfas://slkfjdalkjf.akswjlfslkjf.slkjfls')

'''

'\nimport ipywidgets as widgets\nfrom IPython.display import display\n\ndef input_value(description = \'\', placeholder = \'\'):\n    # Create a text input widget\n    textbox_widget = widgets.Text(\n        value=\'\',  # Initial value\n        placeholder=placeholder,\n        description=description,\n        style={\'description_width\': \'initial\'},\n        layout=widgets.Layout(width=\'600px\')\n    )\n\n    # Create a button to capture the value\n    capture_button = widgets.Button(\n        description=\'Submit\',\n        button_style=\'success\',\n        tooltip=\'Click to submit the value\',\n        icon=\'check\'\n    )\n\n    # Variable to store the captured value\n    captured_value = None\n\n    # Function to handle button click\n    def on_capture_click(b):\n        global captured_value\n        captured_value = textbox_widget.value\n        print(f"✅ Captured value: \'{captured_value}\'")\n        \n        # Optional: Disable the widget after capture\n        tex

### 🔧 Additional Deployment Utilities
Helper functions for advanced deployment scenarios and KQL database management:

### ⚙️ Manual Deployment Logic
Alternative deployment approach for advanced scenarios:

### 🔄 Post-Deployment Tasks
Additional configuration and table initialization for the AMI solution: