In [1]:
# DO NOT DELETE THIS CELL

# API Name: Admin - Dataflows GetDataflowDatasourcesAsAdmin
# Command:  GET https://api.powerbi.com/v1.0/myorg/admin/dataflows/{dataflowId}/datasources
# Doc:      https://learn.microsoft.com/en-us/rest/api/power-bi/admin/dataflows-get-dataflow-datasources-as-admin

# Loads table: pbi_dataflows_datasources
# Loads table: pbi_dataflows_datasources_conn_details

# Note: this queries the pbi_dataflows table to get a list of dataflowId values for the API calls.

StatementMeta(, 257437a1-650c-4e5a-92b0-49e9cf2ed27f, 3, Finished, Available, Finished)

In [2]:
# CELL 1 - Title and Introduction
# ==================================
# Power BI Dataflow Datasources to Delta Lake - PySpark Notebook
# This notebook retrieves Power BI dataflow datasources using the GetDataflowDatasourcesAsAdmin API
# and loads them into Delta Lake tables with optimization for analytics workloads
# 
# Tables created:
# 1. pbi_dataflows_datasources - Main datasource information
# 2. pbi_dataflows_datasources_conn_details - Connection details (key-value pairs)
# ==================================


# CELL 2 - Import Libraries
# ==================================
# Import required libraries
import requests
import json
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit, from_json, explode, when
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType, MapType
import logging
from typing import Dict, List, Optional, Tuple
from delta.tables import DeltaTable
import random
import time
# ==================================


# CELL 3 - Configure Logging and Initialize Spark
# ==================================
# Configure logging
# This helps us track what's happening in our code and debug issues
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Initialize Spark Session with Delta Lake configurations
# In Fabric notebooks, Spark is pre-configured with Delta support
spark = SparkSession.builder \
    .appName("PowerBIDataflowDatasourcesToDelta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Set optimal configurations for Delta operations
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# ==================================


# CELL 4 - Configuration Parameters
# ==================================
# Configuration Parameters
# These are the settings we'll use throughout the notebook
CONFIG = {
    "API_BASE_URL": "https://api.powerbi.com/v1.0/myorg",
    "DATASOURCES_ENDPOINT": "/admin/dataflows/{dataflowId}/datasources",  # Endpoint for dataflow datasources
    "MAX_RETRIES": 5,  # Number of retries for handling rate limits
    "INITIAL_BACKOFF_SEC": 1,  # Initial backoff time in seconds
    "MAX_BACKOFF_SEC": 60,  # Maximum backoff time in seconds
    "BACKOFF_FACTOR": 2,  # Exponential backoff multiplier
    "JITTER_FACTOR": 0.1,  # Random jitter to add to backoff (as a fraction)
    "TIMEOUT": 30,  # API request timeout in seconds
    "MAIN_TABLE_NAME": "pbi_dataflows_datasources",  # Main datasources table
    "DETAILS_TABLE_NAME": "pbi_dataflows_datasources_conn_details",  # Connection details table
    "LAKEHOUSE_PATH": "Tables",  # Default Tables folder in Fabric Lakehouse
    "DEBUG_MODE": True,  # Set to True to enable extra debugging output
    "TEST_LIMIT": None,  # No limit - process all dataflows
    "DATAFLOWS_SOURCE_TABLE": "pbi_dataflows"  # Source table for dataflow IDs
}
# ==================================


# CELL 5 - Authentication Function
# ==================================
def get_access_token():
    """
    Get Azure AD access token for Power BI API authentication.
    
    In a Fabric notebook, the token is automatically available through mssparkutils.
    This function retrieves the token that's needed to authenticate with the Power BI REST API.
    
    Returns:
        str: The access token
    
    Note:
        mssparkutils is a utility library provided by Microsoft Fabric
        that handles authentication automatically.
    """
    try:
        # In Fabric notebooks, we can get the token using mssparkutils
        from notebookutils import mssparkutils
        token_response = mssparkutils.credentials.getToken("https://analysis.windows.net/powerbi/api")
        return token_response
    except Exception as e:
        logger.error(f"Failed to get access token: {str(e)}")
        raise
# ==================================


# CELL 6 - Get Dataflow IDs Function
# ==================================
def get_dataflow_ids(limit: int = None) -> List[str]:
    """
    Retrieve dataflow IDs from the pbi_dataflows table.
    
    This function queries the existing pbi_dataflows table to get the list of
    dataflow IDs that we need to process.
    
    Args:
        limit: Optional limit on number of dataflow IDs to return (for testing)
    
    Returns:
        list: A list of dataflow ID strings
    """
    try:
        # Build the SQL query
        sql_query = f"""
        SELECT dataflowId 
        FROM {CONFIG['DATAFLOWS_SOURCE_TABLE']}
        WHERE dataflowId IS NOT NULL
        ORDER BY workspaceId
        """
        
        # Add limit if specified
        if limit:
            sql_query += f" LIMIT {limit}"
        
        logger.info(f"Executing query: {sql_query}")
        
        # Execute the query
        dataflow_ids_df = spark.sql(sql_query)
        
        # Convert to list of strings
        dataflow_ids = [row.dataflowId for row in dataflow_ids_df.collect()]
        
        logger.info(f"Retrieved {len(dataflow_ids)} dataflow IDs")
        
        if CONFIG['DEBUG_MODE'] and dataflow_ids:
            logger.info(f"First few dataflow IDs: {dataflow_ids[:5]}")
        
        return dataflow_ids
        
    except Exception as e:
        logger.error(f"Failed to retrieve dataflow IDs: {str(e)}")
        raise
# ==================================


# CELL 7 - API Call Function
# ==================================
def call_powerbi_api(endpoint: str, access_token: str, params: Optional[Dict] = None) -> Optional[Dict]:
    """
    Make a REST API call to Power BI with advanced rate limit handling.
    
    This function handles the HTTP request to the Power BI API, including:
    - Setting up authentication headers
    - Managing retries with intelligent backoff for rate limiting (429 errors)
    - Implementing jitter to avoid synchronized retries
    - Detailed error handling and logging
    - Special handling for 404 errors (dataflow not found)
    
    Args:
        endpoint: The API endpoint path (e.g., "/admin/dataflows/{id}/datasources")
        access_token: The Azure AD access token
        params: Optional query parameters for the API call
    
    Returns:
        dict: The JSON response from the API, or None if 404 (not found)
    
    Raises:
        requests.exceptions.RequestException: If the API call fails after all retries
    """
    url = f"{CONFIG['API_BASE_URL']}{endpoint}"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    
    # Initialize backoff time
    backoff_time = CONFIG['INITIAL_BACKOFF_SEC']
    
    # Retry logic with intelligent backoff
    for attempt in range(CONFIG['MAX_RETRIES']):
        try:
            # Log the full URL with parameters for debugging
            if CONFIG['DEBUG_MODE']:
                logger.info(f"Making API call to: {url} with params: {params} (Attempt {attempt + 1})")
            
            response = requests.get(
                url,
                headers=headers,
                params=params,
                timeout=CONFIG['TIMEOUT']
            )
            
            # Log the response status
            if CONFIG['DEBUG_MODE']:
                logger.info(f"Response status: {response.status_code}")
            
            # Handle 404 - dataflow not found (this is expected for some dataflows)
            if response.status_code == 404:
                logger.warning(f"Dataflow not found (404) for endpoint: {endpoint}")
                return None
            
            # Rate limit handling (429 Too Many Requests)
            if response.status_code == 429:
                # Get retry-after header if available, otherwise use our backoff
                retry_after = response.headers.get('Retry-After')
                
                if retry_after and retry_after.isdigit():
                    # If server specified a wait time, use it
                    wait_time = int(retry_after)
                else:
                    # Calculate wait time with exponential backoff and jitter
                    jitter = random.uniform(0, CONFIG['JITTER_FACTOR'] * backoff_time)
                    wait_time = backoff_time + jitter
                    # Update backoff for next attempt
                    backoff_time = min(backoff_time * CONFIG['BACKOFF_FACTOR'], CONFIG['MAX_BACKOFF_SEC'])
                
                logger.warning(f"Rate limit exceeded (429). Waiting {wait_time:.2f} seconds before retry.")
                time.sleep(wait_time)
                continue  # Skip to next retry attempt without raising exception
            
            # Log the response for debugging in case of errors
            if response.status_code >= 400:
                logger.error(f"API error: Status {response.status_code}, Response: {response.text}")
                logger.error(f"Request URL: {response.request.url}")
                logger.error(f"Request headers: {response.request.headers}")
            
            # For all other status codes, use raise_for_status to handle
            response.raise_for_status()
            
            # If we get here, the request was successful
            # Reset backoff for next API call (not next attempt)
            backoff_time = CONFIG['INITIAL_BACKOFF_SEC']
            
            # Parse and return the JSON response
            try:
                response_json = response.json()
                if CONFIG['DEBUG_MODE']:
                    if "value" in response_json and isinstance(response_json["value"], list):
                        logger.info(f"Response contains {len(response_json['value'])} datasources")
                return response_json
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse response as JSON: {str(e)}")
                logger.error(f"Response content: {response.text[:1000]}")  # Log first 1000 chars of response
                raise
            
        except requests.exceptions.RequestException as e:
            last_attempt = attempt == CONFIG['MAX_RETRIES'] - 1
            
            # Special handling for non-429 errors
            if not (hasattr(e, 'response') and e.response is not None and e.response.status_code == 429):
                logger.warning(f"API call failed (Attempt {attempt + 1}): {str(e)}")
                
                if last_attempt:
                    logger.error(f"All retry attempts failed for endpoint: {endpoint}")
                    logger.error(f"Final error: {str(e)}")
                    raise
                
                # Calculate wait time with exponential backoff and jitter
                jitter = random.uniform(0, CONFIG['JITTER_FACTOR'] * backoff_time)
                wait_time = backoff_time + jitter
                # Update backoff for next attempt
                backoff_time = min(backoff_time * CONFIG['BACKOFF_FACTOR'], CONFIG['MAX_BACKOFF_SEC'])
                
                logger.info(f"Waiting {wait_time:.2f} seconds before retry.")
                time.sleep(wait_time)
# ==================================


# CELL 8 - Get Dataflow Datasources Function
# ==================================
def get_dataflow_datasources(dataflow_id: str, access_token: str) -> List[Dict]:
    """
    Retrieve datasources for a specific dataflow from the Power BI API.
    
    This function makes a request to the GetDataflowDatasourcesAsAdmin API endpoint
    to get all datasources associated with a specific dataflow.
    
    Args:
        dataflow_id: The dataflow ID to get datasources for
        access_token: The Azure AD access token
    
    Returns:
        list: A list of datasource objects, empty list if no datasources or dataflow not found
    """
    try:
        # Format the endpoint with the dataflow ID
        endpoint = CONFIG['DATASOURCES_ENDPOINT'].format(dataflowId=dataflow_id)
        
        logger.info(f"Retrieving datasources for dataflow: {dataflow_id}")
        
        # Make the API call
        response_data = call_powerbi_api(endpoint, access_token)
        
        # Handle case where dataflow was not found
        if response_data is None:
            logger.warning(f"Dataflow {dataflow_id} not found - skipping")
            return []
        
        # Extract datasources from the response
        datasources = response_data.get("value", [])
        
        if datasources:
            logger.info(f"Retrieved {len(datasources)} datasources for dataflow {dataflow_id}")
            
            # Log first datasource for debugging
            if CONFIG['DEBUG_MODE'] and datasources:
                logger.info(f"Sample datasource structure: {json.dumps(datasources[0], indent=2)}")
        else:
            logger.info(f"No datasources found for dataflow {dataflow_id}")
        
        return datasources
        
    except Exception as e:
        logger.error(f"Failed to retrieve datasources for dataflow {dataflow_id}: {str(e)}")
        # Don't raise - we want to continue processing other dataflows
        return []
# ==================================


# CELL 9 - Process All Dataflows Function
# ==================================
def process_all_dataflows(dataflow_ids: List[str], access_token: str) -> Tuple[List[Dict], List[Dict]]:
    """
    Process all dataflows to retrieve their datasources.
    
    This function iterates through all dataflow IDs and retrieves their datasources,
    then prepares the data for loading into two separate tables.
    
    Args:
        dataflow_ids: List of dataflow IDs to process
        access_token: The Azure AD access token
    
    Returns:
        tuple: (main_records, detail_records) - Two lists of dictionaries for the tables
    """
    main_records = []      # For pbi_dataflows_datasources table
    detail_records = []    # For pbi_dataflows_datasources_conn_details table
    
    total_dataflows = len(dataflow_ids)
    processed_count = 0
    successful_count = 0
    
    logger.info(f"Starting to process {total_dataflows} dataflows")
    
    for i, dataflow_id in enumerate(dataflow_ids, 1):
        try:
            logger.info(f"Processing dataflow {i}/{total_dataflows}: {dataflow_id}")
            
            # Get datasources for this dataflow
            datasources = get_dataflow_datasources(dataflow_id, access_token)
            
            processed_count += 1
            
            if datasources:
                successful_count += 1
                
                # Process each datasource
                for datasource in datasources:
                    # Prepare main table record
                    main_record = {
                        "dataflowId": dataflow_id,
                        "datasourceId": datasource.get("datasourceId"),
                        "name": datasource.get("name"),
                        "datasourceType": datasource.get("datasourceType"),
                        "connectionString": datasource.get("connectionString"),
                        "gatewayId": datasource.get("gatewayId")
                    }
                    main_records.append(main_record)
                    
                    # Process connection details (nested JSON object)
                    connection_details = datasource.get("connectionDetails", {})
                    
                    if connection_details and isinstance(connection_details, dict):
                        for detail_key, detail_value in connection_details.items():
                            detail_record = {
                                "dataflowId": dataflow_id,
                                "datasourceId": datasource.get("datasourceId"),
                                "detail_key": detail_key,
                                "detail_value": str(detail_value) if detail_value is not None else None
                            }
                            detail_records.append(detail_record)
            
            # Add a small delay to be respectful to the API
            if i < total_dataflows:  # Don't sleep after the last one
                time.sleep(0.1)  # 100ms delay between requests
                
        except Exception as e:
            logger.error(f"Error processing dataflow {dataflow_id}: {str(e)}")
            processed_count += 1
            # Continue processing other dataflows
            continue
    
    logger.info(f"Completed processing. Processed: {processed_count}, Successful: {successful_count}")
    logger.info(f"Total main records: {len(main_records)}")
    logger.info(f"Total detail records: {len(detail_records)}")
    
    return main_records, detail_records
# ==================================


# CELL 10 - Create DataFrames Function
# ==================================
def create_dataframes(main_records: List[Dict], detail_records: List[Dict]) -> Tuple["DataFrame", "DataFrame"]:
    """
    Convert the processed records into PySpark DataFrames for Delta Lake.
    
    This function:
    - Creates structured DataFrames with the datasource data
    - Adds metadata columns for tracking
    - Ensures proper schema definition
    
    Args:
        main_records: List of main datasource records
        detail_records: List of connection detail records
    
    Returns:
        tuple: (main_df, details_df) - Two PySpark DataFrames
    """
    
    # Define the schema for main table (without timestamp - we'll add it later)
    main_schema_without_timestamp = StructType([
        StructField("dataflowId", StringType(), False),         # Not nullable - primary key part
        StructField("datasourceId", StringType(), False),       # Not nullable - primary key part
        StructField("name", StringType(), True),                # Nullable - deprecated field
        StructField("datasourceType", StringType(), True),      # Nullable
        StructField("connectionString", StringType(), True),    # Nullable - deprecated field
        StructField("gatewayId", StringType(), True)            # Nullable
    ])
    
    # Define the schema for details table (without timestamp - we'll add it later)
    details_schema_without_timestamp = StructType([
        StructField("dataflowId", StringType(), False),         # Not nullable - foreign key
        StructField("datasourceId", StringType(), False),       # Not nullable - foreign key
        StructField("detail_key", StringType(), False),         # Not nullable - primary key part
        StructField("detail_value", StringType(), True)         # Nullable
    ])
    
    # Define complete schemas (with timestamp) for table creation
    main_schema = StructType([
        StructField("dataflowId", StringType(), False),         # Not nullable - primary key part
        StructField("datasourceId", StringType(), False),       # Not nullable - primary key part
        StructField("name", StringType(), True),                # Nullable - deprecated field
        StructField("datasourceType", StringType(), True),      # Nullable
        StructField("connectionString", StringType(), True),    # Nullable - deprecated field
        StructField("gatewayId", StringType(), True),           # Nullable
        StructField("extraction_timestamp", TimestampType(), False)  # Not nullable - metadata
    ])
    
    # Define schema for details table
    details_schema = StructType([
        StructField("dataflowId", StringType(), False),         # Not nullable - foreign key
        StructField("datasourceId", StringType(), False),       # Not nullable - foreign key
        StructField("detail_key", StringType(), False),         # Not nullable - primary key part
        StructField("detail_value", StringType(), True),        # Nullable
        StructField("extraction_timestamp", TimestampType(), False)  # Not nullable - metadata
    ])
    
    # Create main DataFrame
    if main_records:
        main_pandas_df = pd.DataFrame(main_records)
        
        # Ensure all required columns exist with proper data types
        required_columns = ["dataflowId", "datasourceId", "name", "datasourceType", "connectionString", "gatewayId"]
        for col_name in required_columns:
            if col_name not in main_pandas_df.columns:
                main_pandas_df[col_name] = None
            # Convert all columns to string type to avoid VOID type issues
            main_pandas_df[col_name] = main_pandas_df[col_name].astype('string')
        
        # Create Spark DataFrame with explicit schema to avoid VOID type
        main_df = spark.createDataFrame(main_pandas_df[required_columns], schema=main_schema_without_timestamp)
        # Add extraction timestamp
        main_df = main_df.withColumn("extraction_timestamp", current_timestamp())
    else:
        logger.warning("No main records found. Creating empty main DataFrame.")
        empty_rdd = spark.sparkContext.emptyRDD()
        main_df = spark.createDataFrame(empty_rdd, main_schema)
    
    # Create details DataFrame
    if detail_records:
        details_pandas_df = pd.DataFrame(detail_records)
        
        # Ensure all required columns exist with proper data types
        required_detail_columns = ["dataflowId", "datasourceId", "detail_key", "detail_value"]
        for col_name in required_detail_columns:
            if col_name not in details_pandas_df.columns:
                details_pandas_df[col_name] = None
            # Convert all columns to string type to avoid VOID type issues
            details_pandas_df[col_name] = details_pandas_df[col_name].astype('string')
        
        # Create Spark DataFrame with explicit schema to avoid VOID type
        details_df = spark.createDataFrame(details_pandas_df[required_detail_columns], schema=details_schema_without_timestamp)
        # Add extraction timestamp
        details_df = details_df.withColumn("extraction_timestamp", current_timestamp())
    else:
        logger.warning("No detail records found. Creating empty details DataFrame.")
        empty_rdd = spark.sparkContext.emptyRDD()
        details_df = spark.createDataFrame(empty_rdd, details_schema)
    
    logger.info(f"Created main DataFrame with {main_df.count()} rows")
    logger.info(f"Created details DataFrame with {details_df.count()} rows")
    
    return main_df, details_df
# ==================================


# CELL 11 - Delta Lake Operations Functions
# ==================================
def ensure_delta_table_exists(table_name: str, df_schema):
    """
    Ensure the Delta table exists, creating it if necessary.
    
    Args:
        table_name: Name of the Delta table
        df_schema: Schema of the DataFrame
    """
    try:
        # Check if table exists
        spark.sql(f"DESCRIBE TABLE {table_name}")
        logger.info(f"Delta table '{table_name}' already exists")
    except Exception:
        # Table doesn't exist, create it
        logger.info(f"Creating Delta table '{table_name}'")
        
        # Create an empty DataFrame with the schema
        empty_df = spark.createDataFrame([], df_schema)
        
        # Create the Delta table
        empty_df.write \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
        
        logger.info(f"Delta table '{table_name}' created successfully")


def merge_main_data_to_delta(source_df, table_name: str):
    """
    Merge new main datasource data into the Delta table using MERGE operation.
    
    This function performs an upsert operation based on dataflowId + datasourceId:
    - Updates existing records if the combination matches
    - Inserts new records if the combination doesn't exist
    - Handles schema evolution for VOID/NULL type columns
    
    Args:
        source_df: DataFrame with new data
        table_name: Name of the target Delta table
    """
    logger.info(f"Starting merge operation for main table {table_name}")
    
    # Check if target table exists and has data
    try:
        target_df = spark.table(table_name)
        target_count = target_df.count()
        target_schema = target_df.schema
        
        # Check for VOID/NULL type columns in target table
        void_columns = []
        for field in target_schema.fields:
            if str(field.dataType) in ['NullType', 'void']:
                void_columns.append(field.name)
                logger.warning(f"Found VOID/NULL type column in target table: {field.name}")
        
        # If we have VOID columns or the table is empty, we need to handle schema evolution
        if void_columns or target_count == 0:
            logger.info("Handling schema evolution due to VOID columns or empty table")
            
            # For empty table or tables with VOID columns, overwrite with new schema
            source_df.write \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(table_name)
            
            logger.info("Table schema updated and data loaded successfully")
            return
            
    except Exception as e:
        logger.error(f"Error checking target table: {str(e)}")
        # If table doesn't exist or other error, create it
        source_df.write \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
        logger.info("New table created successfully")
        return
    
    # Create a temporary view for the merge operation
    source_df.createOrReplaceTempView("main_datasource_updates")
    
    # Perform the merge operation with explicit casting to handle any remaining type issues
    merge_query = f"""
    MERGE INTO {table_name} AS target
    USING main_datasource_updates AS source
    ON target.dataflowId = source.dataflowId AND target.datasourceId = source.datasourceId
    WHEN MATCHED THEN
        UPDATE SET 
            target.name = CAST(source.name AS STRING),
            target.datasourceType = CAST(source.datasourceType AS STRING),
            target.connectionString = CAST(source.connectionString AS STRING),
            target.gatewayId = CAST(source.gatewayId AS STRING),
            target.extraction_timestamp = source.extraction_timestamp
    WHEN NOT MATCHED THEN
        INSERT (dataflowId, datasourceId, name, datasourceType, connectionString, gatewayId, extraction_timestamp)
        VALUES (source.dataflowId, source.datasourceId, CAST(source.name AS STRING), 
                CAST(source.datasourceType AS STRING), CAST(source.connectionString AS STRING), 
                CAST(source.gatewayId AS STRING), source.extraction_timestamp)
    """
    
    try:
        spark.sql(merge_query)
        logger.info("Main table merge operation completed successfully")
    except Exception as e:
        logger.error(f"Merge operation failed: {str(e)}")
        logger.info("Attempting fallback: overwriting table with new schema")
        
        # Fallback: overwrite the table if merge fails due to schema issues
        source_df.write \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
        
        logger.info("Table overwritten successfully with new schema")


def merge_details_data_to_delta(source_df, table_name: str):
    """
    Merge new connection details data into the Delta table using MERGE operation.
    
    This function performs an upsert operation based on dataflowId + datasourceId + detail_key:
    - Updates existing records if the combination matches
    - Inserts new records if the combination doesn't exist
    - Handles schema evolution for VOID/NULL type columns
    
    Args:
        source_df: DataFrame with new data
        table_name: Name of the target Delta table
    """
    logger.info(f"Starting merge operation for details table {table_name}")
    
    # Check if target table exists and has data
    try:
        target_df = spark.table(table_name)
        target_count = target_df.count()
        target_schema = target_df.schema
        
        # Check for VOID/NULL type columns in target table
        void_columns = []
        for field in target_schema.fields:
            if str(field.dataType) in ['NullType', 'void']:
                void_columns.append(field.name)
                logger.warning(f"Found VOID/NULL type column in target table: {field.name}")
        
        # If we have VOID columns or the table is empty, we need to handle schema evolution
        if void_columns or target_count == 0:
            logger.info("Handling schema evolution due to VOID columns or empty table")
            
            # For empty table or tables with VOID columns, overwrite with new schema
            source_df.write \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(table_name)
            
            logger.info("Table schema updated and data loaded successfully")
            return
            
    except Exception as e:
        logger.error(f"Error checking target table: {str(e)}")
        # If table doesn't exist or other error, create it
        source_df.write \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
        logger.info("New table created successfully")
        return
    
    # Create a temporary view for the merge operation
    source_df.createOrReplaceTempView("details_datasource_updates")
    
    # Perform the merge operation with explicit casting to handle any remaining type issues
    merge_query = f"""
    MERGE INTO {table_name} AS target
    USING details_datasource_updates AS source
    ON target.dataflowId = source.dataflowId 
       AND target.datasourceId = source.datasourceId 
       AND target.detail_key = source.detail_key
    WHEN MATCHED THEN
        UPDATE SET 
            target.detail_value = CAST(source.detail_value AS STRING),
            target.extraction_timestamp = source.extraction_timestamp
    WHEN NOT MATCHED THEN
        INSERT (dataflowId, datasourceId, detail_key, detail_value, extraction_timestamp)
        VALUES (source.dataflowId, source.datasourceId, source.detail_key, 
                CAST(source.detail_value AS STRING), source.extraction_timestamp)
    """
    
    try:
        spark.sql(merge_query)
        logger.info("Details table merge operation completed successfully")
    except Exception as e:
        logger.error(f"Merge operation failed: {str(e)}")
        logger.info("Attempting fallback: overwriting table with new schema")
        
        # Fallback: overwrite the table if merge fails due to schema issues
        source_df.write \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
        
        logger.info("Table overwritten successfully with new schema")


def optimize_delta_tables(main_table_name: str, details_table_name: str):
    """
    Optimize both Delta tables for better query performance.
    
    This function:
    - Updates table statistics for query optimization
    - Uses a compatible method for Microsoft Fabric
    
    Args:
        main_table_name: Name of the main Delta table to optimize
        details_table_name: Name of the details Delta table to optimize
    """
    for table_name in [main_table_name, details_table_name]:
        logger.info(f"Optimizing Delta table '{table_name}'")
        
        try:
            # Update table statistics for better query planning
            spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
            logger.info(f"Table statistics updated successfully for {table_name}")
            
        except Exception as e:
            logger.warning(f"Table optimization step encountered an issue for {table_name}: {str(e)}")
            logger.info("Continuing with process - optimization is not critical for functionality")
# ==================================


# CELL 12 - Main Execution Function
# ==================================
def main():
    """
    Main execution function that orchestrates the entire process.
    
    This function:
    1. Gets the authentication token
    2. Retrieves dataflow IDs from the source table
    3. Processes each dataflow to get its datasources
    4. Creates enhanced PySpark DataFrames with the data
    5. Loads data into two Delta Lake tables
    6. Optimizes the tables for analytics
    """
    try:
        logger.info("Starting Power BI Dataflow Datasources to Delta Lake process")
        
        # Step 1: Get authentication token
        logger.info("Getting access token...")
        access_token = get_access_token()
        logger.info("Successfully obtained access token")
        
        # Step 2: Get dataflow IDs from source table
        logger.info(f"Retrieving all dataflow IDs from {CONFIG['DATAFLOWS_SOURCE_TABLE']}...")
        dataflow_ids = get_dataflow_ids(CONFIG['TEST_LIMIT'])
        
        if not dataflow_ids:
            logger.warning("No dataflow IDs found. Please check the source table and permissions.")
            return None, None
        
        # Step 3: Process all dataflows to get their datasources
        logger.info(f"Processing {len(dataflow_ids)} dataflows to retrieve datasources...")
        main_records, detail_records = process_all_dataflows(dataflow_ids, access_token)
        
        # Step 4: Create DataFrames
        logger.info("Creating DataFrames...")
        main_df, details_df = create_dataframes(main_records, detail_records)
        
        # Show sample data
        if main_df.count() > 0:
            logger.info("Sample of main datasources data:")
            main_df.show(5, truncate=False)
        
        if details_df.count() > 0:
            logger.info("Sample of connection details data:")
            details_df.show(10, truncate=False)
        
        # Step 5: Prepare Delta tables
        main_table_name = CONFIG["MAIN_TABLE_NAME"]
        details_table_name = CONFIG["DETAILS_TABLE_NAME"]
        
        ensure_delta_table_exists(main_table_name, main_df.schema)
        ensure_delta_table_exists(details_table_name, details_df.schema)
        
        # Step 6: Merge data into Delta tables
        if main_df.count() > 0:
            merge_main_data_to_delta(main_df, main_table_name)
        else:
            logger.info("No main datasource data to merge")
        
        if details_df.count() > 0:
            merge_details_data_to_delta(details_df, details_table_name)
        else:
            logger.info("No connection details data to merge")
        
        # Step 7: Optimize the Delta tables
        optimize_delta_tables(main_table_name, details_table_name)
        
        # Step 8: Display final statistics
        logger.info("Loading completed successfully!")
        
        # Show table information for both tables
        for table_name in [main_table_name, details_table_name]:
            logger.info(f"\n=== {table_name} Table Information ===")
            spark.sql(f"DESCRIBE DETAIL {table_name}").show(truncate=False)
            
            # Show row count
            row_count = spark.table(table_name).count()
            logger.info(f"Total rows in {table_name}: {row_count}")
        
        # Show summary statistics for main table
        if spark.table(main_table_name).count() > 0:
            main_summary_stats = spark.sql(f"""
                SELECT 
                    COUNT(*) as total_datasources,
                    COUNT(DISTINCT dataflowId) as unique_dataflows,
                    COUNT(DISTINCT datasourceType) as datasource_types,
                    COUNT(DISTINCT gatewayId) as unique_gateways,
                    MAX(extraction_timestamp) as last_updated
                FROM {main_table_name}
            """)
            
            logger.info("Main table summary statistics:")
            main_summary_stats.show(truncate=False)
            
            # Show distribution by datasource type
            type_distribution = spark.sql(f"""
                SELECT 
                    datasourceType,
                    COUNT(*) as count
                FROM {main_table_name}
                WHERE datasourceType IS NOT NULL
                GROUP BY datasourceType
                ORDER BY count DESC
            """)
            
            logger.info("Datasource distribution by type:")
            type_distribution.show(truncate=False)
        
        # Show summary statistics for details table
        if spark.table(details_table_name).count() > 0:
            details_summary_stats = spark.sql(f"""
                SELECT 
                    COUNT(*) as total_connection_details,
                    COUNT(DISTINCT dataflowId) as unique_dataflows,
                    COUNT(DISTINCT detail_key) as unique_detail_keys,
                    MAX(extraction_timestamp) as last_updated
                FROM {details_table_name}
            """)
            
            logger.info("Details table summary statistics:")
            details_summary_stats.show(truncate=False)
            
            # Show most common connection detail keys
            key_distribution = spark.sql(f"""
                SELECT 
                    detail_key,
                    COUNT(*) as count
                FROM {details_table_name}
                GROUP BY detail_key
                ORDER BY count DESC
                LIMIT 10
            """)
            
            logger.info("Top connection detail keys:")
            key_distribution.show(truncate=False)
        
        return main_df, details_df
        
    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        raise
# ==================================


# CELL 13 - Execute Main Function
# ==================================
# Execute the main function
if __name__ == "__main__":
    main_df, details_df = main()
# ==================================

StatementMeta(, 257437a1-650c-4e5a-92b0-49e9cf2ed27f, 4, Finished, Available, Finished)

2025-07-17 15:27:12,696 - INFO - Starting Power BI Dataflow Datasources to Delta Lake process
2025-07-17 15:27:12,699 - INFO - Getting access token...
2025-07-17 15:27:12,716 - INFO - Successfully obtained access token
2025-07-17 15:27:12,720 - INFO - Retrieving all dataflow IDs from pbi_dataflows...
2025-07-17 15:27:12,720 - INFO - Executing query: 
        SELECT dataflowId 
        FROM pbi_dataflows
        WHERE dataflowId IS NOT NULL
        ORDER BY workspaceId
        
2025-07-17 15:27:26,490 - INFO - Retrieved 258 dataflow IDs
2025-07-17 15:27:26,492 - INFO - First few dataflow IDs: ['ff64d204-5c47-46cb-89de-9116d9d0704e', 'bb7c4b9f-b7ac-4bc1-b512-6a940f3dedb8', 'a34e7f23-d77f-418a-889e-cba639a3889b', 'b217e7ab-3aac-483a-b99a-cf8b91ac15b6', '8d54f573-dfc4-41ff-bc4f-7efd384d00eb']
2025-07-17 15:27:26,493 - INFO - Processing 258 dataflows to retrieve datasources...
2025-07-17 15:27:26,494 - INFO - Starting to process 258 dataflows
2025-07-17 15:27:26,496 - INFO - Processing data

+------------------------+----------------+------------------+--------------------------+
|total_connection_details|unique_dataflows|unique_detail_keys|last_updated              |
+------------------------+----------------+------------------+--------------------------+
|1047                    |255             |7                 |2025-07-17 15:30:28.531678|
+------------------------+----------------+------------------+--------------------------+

+-----------------------+-----+
|detail_key             |count|
+-----------------------+-----+
|server                 |568  |
|database               |245  |
|extensionDataSourceKind|79   |
|extensionDataSourcePath|79   |
|url                    |39   |
|sharePointSiteUrl      |36   |
|emailAddress           |1    |
+-----------------------+-----+



In [3]:
from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.appName("Refresh SQL Endpoint Metadata").getOrCreate()

# refresh the specific table
spark.sql("REFRESH TABLE pbi_dataflows_datasources")
print("Metadata refresh triggered successfully.")

# refresh the specific table
spark.sql("REFRESH TABLE pbi_dataflows_datasources_conn_details")
print("Metadata refresh triggered successfully.")

StatementMeta(, 257437a1-650c-4e5a-92b0-49e9cf2ed27f, 5, Finished, Available, Finished)

Metadata refresh triggered successfully.
Metadata refresh triggered successfully.
