# Power BI Semantic Model Refresh Schedule Frequency Analysis

This notebook processes Power BI refresh schedule data to determine frequency patterns and stores the results in a Delta Lake table.

## Process Overview:
1. Load refresh schedule data from Delta table
2. Create frequency reference DataFrame
3. Determine semantic model frequencies (Daily/Weekly)
4. Save processed data to Delta table

## Author: Data Engineering Team
## Last Modified: 2024

In [None]:
# Import required libraries
import requests
import json
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, array_contains, when, countDistinct, 
    collect_set, size, array_intersect, array, concat_ws
)
from pyspark.sql.types import StringType, IntegerType
from notebookutils import mssparkutils
import time
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Initialize global variables for tracking
total_execution_start_time = time.time()
step_timings = {}

print("="*80)
print("POWER BI SEMANTIC MODEL REFRESH SCHEDULE FREQUENCY ANALYSIS")
print("="*80)
print(f"Notebook execution started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("All libraries loaded successfully.")
print("="*80)

In [None]:
# Step 0: Configuration and Connection Setup
print("\nüîß Starting Step 0: Configuration and Authentication Setup...")
step_start_time = time.time()

try:
    # Retrieve secrets from Azure Key Vault
    print("   üìã Retrieving connection secrets from Azure Key Vault...")
    client_id = mssparkutils.credentials.getSecret(
        "https://kv-pbi-techical-accounts.vault.azure.net/", "clientid"
    )
    client_secret = mssparkutils.credentials.getSecret(
        "https://kv-pbi-techical-accounts.vault.azure.net/", "clientsecret"
    )
    tenant_id = mssparkutils.credentials.getSecret(
        "https://kv-pbi-techical-accounts.vault.azure.net/", "tenantid"
    )
    dataset_id = mssparkutils.credentials.getSecret(
        "https://kv-pbi-techical-accounts.vault.azure.net/", 
        "datasetid-fabric-capacity-metrics"
    )
    
    # Configuration variables
    resource = 'https://analysis.windows.net/powerbi/api'
    api_version = 'v1.0'
    refresh_schedule_path = (
        'abfss://14b44c0a-e3c3-41eb-b31f-c2a7d90ce593@onelake.dfs.fabric.microsoft.com/'
        'e6e52507-e786-469c-8048-c816d44f5b5a/Tables/mard_mdna_t_pbi_sm_refresh_schedule'
    )
    delta_table_destination = "mard_mdna_t_pbi_sm_refresh_schedule_frequency"
    delta_table_database = "mdna_pbi_monitoring"

    print("   ‚úÖ Successfully retrieved all connection secrets.")
    print(f"   üìä Target Database: {delta_table_database}")
    print(f"   üìä Target Table: {delta_table_destination}")
    
except Exception as e:
    error_msg = f"‚ùå Error retrieving secrets: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    raise

In [None]:
# Step 0.1: Obtain Access Token
print("\nüîê Starting Step 0.1: Obtaining Power BI API access token...")

try:
    auth_url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/token'
    data = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'resource': resource
    }
    
    auth_response = requests.post(auth_url, data=data, timeout=30)
    auth_response.raise_for_status()
    access_token = auth_response.json()['access_token']

    headers = {'Authorization': f'Bearer {access_token}'}
    
    print("   ‚úÖ Access token obtained successfully.")
    print(f"   üïí Token acquired at: {datetime.now().strftime('%H:%M:%S')}")
    
except requests.exceptions.RequestException as e:
    error_msg = f"‚ùå Error obtaining access token: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    raise
except Exception as e:
    error_msg = f"‚ùå Unexpected error during token acquisition: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    raise

step_end_time = time.time()
step_timings['Step_0'] = step_end_time - step_start_time
print(f"\n‚úÖ Step 0 completed successfully in {step_timings['Step_0']:.2f} seconds.")
print("="*80)

In [None]:
# Step 1: Load Refresh Schedule Data
print("\nüìä Starting Step 1: Loading Refresh Schedule data from Delta table...")
step_start_time = time.time()

try:
    print(f"   üìÅ Source Path: {refresh_schedule_path}")
    
    # Load data from Delta table
    df_spark_refresh_schedule = spark.read.format("delta").load(refresh_schedule_path)
    
    # Cache the DataFrame for better performance in subsequent operations
    df_spark_refresh_schedule.cache()
    
    # Check if DataFrame is empty
    row_count = df_spark_refresh_schedule.count()
    
    if row_count == 0:
        warning_msg = "‚ö†Ô∏è  Warning: Refresh Schedule Delta table is empty. No data to process."
        print(warning_msg)
        logger.warning(warning_msg)
        # You might want to exit here or handle empty data case
    else:
        print(f"   ‚úÖ Successfully loaded refresh schedule data.")
        print(f"   üìà Total rows: {row_count:,}")
        
        # Display schema information
        print("\n   üìã Schema of df_spark_refresh_schedule:")
        df_spark_refresh_schedule.printSchema()
        
        # Show sample data
        print("\n   üîç Sample data (first 5 rows):")
        df_spark_refresh_schedule.show(5, truncate=False)
        
        # Additional data quality checks
        unique_semantic_models = df_spark_refresh_schedule.select("Semantic_Model_ID").distinct().count()
        print(f"   üìä Unique Semantic Models: {unique_semantic_models:,}")
        
        # Check for null values in critical columns
        critical_columns = ["Semantic_Model_ID", "Day", "Time_of_the_Day", "Time_zone"]
        null_counts = {}
        for col_name in critical_columns:
            if col_name in df_spark_refresh_schedule.columns:
                null_count = df_spark_refresh_schedule.filter(col(col_name).isNull()).count()
                null_counts[col_name] = null_count
                if null_count > 0:
                    print(f"   ‚ö†Ô∏è  Warning: {null_count:,} null values found in column '{col_name}'")
        
        if all(count == 0 for count in null_counts.values()):
            print("   ‚úÖ Data quality check: No null values found in critical columns.")

except Exception as e:
    error_msg = f"‚ùå Error loading refresh schedule data from Delta table: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    raise

step_end_time = time.time()
step_timings['Step_1'] = step_end_time - step_start_time
print(f"\n‚úÖ Step 1 completed successfully in {step_timings['Step_1']:.2f} seconds.")
print("="*80)

In [None]:
# Step 2: Create Frequency Reference DataFrame
print("\nüîß Starting Step 2: Creating Frequency Reference DataFrame...")
step_start_time = time.time()

try:
    print("   üîÑ Transforming data using optimized Spark DataFrame operations...")
    
    # Create reference DataFrame using Spark operations (optimized)
    df_refresh_frequency_spark = df_spark_refresh_schedule.withColumn(
        "Key_SM_Refresh_Day",
        concat_ws("_", 
                 col("Semantic_Model_ID"), 
                 col("Time_of_the_Day").cast("string"), 
                 col("Time_zone"))
    ).select(
        "Key_SM_Refresh_Day",
        col("Day").alias("Refresh_Day"),
        "Semantic_Model_ID"
    )
    
    # Cache for performance
    df_refresh_frequency_spark.cache()
    
    # Validate the transformation
    transformed_row_count = df_refresh_frequency_spark.count()
    
    if transformed_row_count == 0:
        warning_msg = "‚ö†Ô∏è  Warning: df_refresh_frequency_spark is empty after transformation."
        print(warning_msg)
        logger.warning(warning_msg)
    else:
        print(f"   ‚úÖ Successfully created frequency reference DataFrame.")
        print(f"   üìà Total rows after transformation: {transformed_row_count:,}")
        
        # Display schema
        print("\n   üìã Schema of df_refresh_frequency_spark:")
        df_refresh_frequency_spark.printSchema()
        
        # Show sample data
        print("\n   üîç Sample transformed data (first 5 rows):")
        df_refresh_frequency_spark.show(5, truncate=False)
        
        # Additional statistics
        unique_keys = df_refresh_frequency_spark.select("Key_SM_Refresh_Day").distinct().count()
        unique_days = df_refresh_frequency_spark.select("Refresh_Day").distinct().count()
        
        print(f"   üìä Unique Key_SM_Refresh_Day entries: {unique_keys:,}")
        print(f"   üìä Unique refresh days found: {unique_days}")
        
        # Show distribution of refresh days
        print("\n   üìà Distribution of refresh days:")
        day_distribution = df_refresh_frequency_spark.groupBy("Refresh_Day").count().orderBy("count", ascending=False)
        day_distribution.show()

except Exception as e:
    error_msg = f"‚ùå Error creating frequency reference DataFrame: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    raise

step_end_time = time.time()
step_timings['Step_2'] = step_end_time - step_start_time
print(f"\n‚úÖ Step 2 completed successfully in {step_timings['Step_2']:.2f} seconds.")
print("="*80)

In [None]:
# Step 3: Determine Frequency for each Semantic Model
print("\nüéØ Starting Step 3: Determining Frequency for each Semantic_Model_ID...")
step_start_time = time.time()

try:
    print("   üîÑ Analyzing refresh patterns to determine frequency...")
    
    # Define all days of the week for comparison
    all_days_list = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
    print(f"   üìÖ Reference days for daily frequency check: {all_days_list}")

    # Group by Semantic_Model_ID and collect distinct refresh days
    print("   üìä Grouping by Semantic_Model_ID and collecting distinct refresh days...")
    df_grouped_days = df_refresh_frequency_spark.groupBy("Semantic_Model_ID").agg(
        collect_set("Refresh_Day").alias("Refresh_Days_Set")
    )
    
    # Cache for performance
    df_grouped_days.cache()
    
    grouped_count = df_grouped_days.count()
    print(f"   üìà Grouped {grouped_count:,} unique semantic models.")

    # Determine frequency based on collected days using optimized logic
    print("   üîç Applying frequency determination logic...")
    df_frequency_spark = df_grouped_days.withColumn(
        "Refresh_Days_Count",
        size(col("Refresh_Days_Set"))
    ).withColumn(
        "Frequency",
        when(
            size(array_intersect(
                col("Refresh_Days_Set"), 
                array(*[lit(d) for d in all_days_list])
            )) == lit(7), "Daily"
        ).otherwise("Weekly")
    ).withColumn(
        "Key_Frequency",
        concat_ws("_", col("Semantic_Model_ID"), col("Frequency"))
    ).select(
        "Key_Frequency", 
        "Semantic_Model_ID", 
        "Frequency", 
        "Refresh_Days_Set",
        "Refresh_Days_Count"
    )
    
    # Cache for performance
    df_frequency_spark.cache()
    
    frequency_count = df_frequency_spark.count()
    
    if frequency_count == 0:
        warning_msg = "‚ö†Ô∏è  Warning: df_frequency_spark is empty after frequency determination."
        print(warning_msg)
        logger.warning(warning_msg)
    else:
        print(f"   ‚úÖ Successfully determined frequencies.")
        print(f"   üìà Total unique semantic models processed: {frequency_count:,}")
        
        # Display schema
        print("\n   üìã Schema of df_frequency_spark:")
        df_frequency_spark.printSchema()
        
        # Show sample data
        print("\n   üîç Sample frequency data (first 5 rows):")
        df_frequency_spark.show(5, truncate=False)
        
        # Frequency distribution analysis
        print("\n   üìä Frequency Distribution Analysis:")
        frequency_distribution = df_frequency_spark.groupBy("Frequency").count().orderBy("Frequency")
        frequency_distribution.show()
        
        # Collect distribution for summary
        freq_summary = frequency_distribution.collect()
        for row in freq_summary:
            print(f"       {row['Frequency']}: {row['count']:,} semantic models")

    # Perform inner join with original refresh schedule
    print("\n   üîó Performing inner join with original refresh schedule data...")
    df_merged_spark = df_spark_refresh_schedule.join(
        df_frequency_spark.select("Semantic_Model_ID", "Key_Frequency", "Frequency"),
        on="Semantic_Model_ID",
        how="inner"
    )
    
    # Cache for performance
    df_merged_spark.cache()
    
    merged_count = df_merged_spark.count()
    
    if merged_count == 0:
        warning_msg = "‚ö†Ô∏è  Warning: df_merged_spark is empty after the join operation."
        print(warning_msg)
        logger.warning(warning_msg)
    else:
        print(f"   ‚úÖ Successfully merged frequency data with refresh schedule.")
        print(f"   üìà Total merged rows: {merged_count:,}")
        
        # Display merged schema
        print("\n   üìã Schema of df_merged_spark:")
        df_merged_spark.printSchema()
        
        # Show sample merged data
        print("\n   üîç Sample merged data (first 3 rows):")
        df_merged_spark.show(3, truncate=False)

except Exception as e:
    error_msg = f"‚ùå Error determining frequencies or merging DataFrames: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    raise

step_end_time = time.time()
step_timings['Step_3'] = step_end_time - step_start_time
print(f"\n‚úÖ Step 3 completed successfully in {step_timings['Step_3']:.2f} seconds.")
print("="*80)

In [None]:
# Step 4: Save to Delta Table
print("\nüíæ Starting Step 4: Saving processed data to Delta Table...")
step_start_time = time.time()

# Initialize variables for tracking
loaded_rows_count = 0
unfiltered_rows_count = 0
total_records_after_load = 0

try:
    # Ensure target database exists
    print(f"   üèóÔ∏è  Ensuring database '{delta_table_database}' exists...")
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {delta_table_database}")
    print(f"   ‚úÖ Database '{delta_table_database}' is ready.")
    
    # Check if target table exists
    table_full_name = f"{delta_table_database}.{delta_table_destination}"
    
    try:
        # Try to read existing table to check if it exists
        existing_table_check = spark.sql(f"SELECT COUNT(*) as count FROM {table_full_name} LIMIT 1")
        existing_table_check.collect()  # This will fail if table doesn't exist
        table_exists = True
        print(f"   üìä Target table '{table_full_name}' exists.")
    except:
        table_exists = False
        print(f"   üÜï Target table '{table_full_name}' does not exist - will be created.")
    
    if table_exists:
        # Read existing keys from Delta table for deduplication
        print("   üîç Reading existing Key_Frequency values for deduplication...")
        existing_keys_df = spark.sql(f"SELECT DISTINCT Key_Frequency FROM {table_full_name}")
        existing_keys = [row.Key_Frequency for row in existing_keys_df.collect()]
        existing_keys_count = len(existing_keys)
        print(f"   üìà Found {existing_keys_count:,} distinct Key_Frequency values in existing table.")
        
        # Filter for new records only
        print("   üîÑ Filtering for new records only...")
        df_new_records = df_merged_spark.filter(~col("Key_Frequency").isin(existing_keys))
    else:
        # If table doesn't exist, all records are new
        print("   üìù Table doesn't exist - all records will be inserted.")
        df_new_records = df_merged_spark
        existing_keys_count = 0
    
    # Cache new records DataFrame
    df_new_records.cache()
    
    # Count new and existing records
    loaded_rows_count = df_new_records.count()
    total_merged_count = df_merged_spark.count()
    unfiltered_rows_count = total_merged_count - loaded_rows_count
    
    print(f"\n   üìä Data Analysis Summary:")
    print(f"       Total processed records: {total_merged_count:,}")
    print(f"       New records to insert: {loaded_rows_count:,}")
    print(f"       Existing records (skipped): {unfiltered_rows_count:,}")
    
    if loaded_rows_count > 0:
        print(f"\n   üíæ Inserting {loaded_rows_count:,} new records into '{table_full_name}'...")
        
        # Add metadata columns
        df_final = df_new_records.withColumn(
            "load_timestamp", 
            lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        )
        
        # Write to Delta table
        write_start_time = time.time()
        df_final.write.format("delta").mode("append").saveAsTable(table_full_name)
        write_end_time = time.time()
        
        write_duration = write_end_time - write_start_time
        print(f"   ‚úÖ Successfully inserted new records in {write_duration:.2f} seconds.")
        print(f"   üìä Write performance: {loaded_rows_count/write_duration:.0f} rows/second")
    else:
        print("   ‚ÑπÔ∏è  No new records to insert. Delta table remains unchanged.")

    # Post-load verification
    print("\n   üîç Performing post-load verification...")
    verification_start_time = time.time()
    
    df_count_after_load = spark.sql(f"SELECT count(*) as total_count FROM {table_full_name}")
    total_records_after_load = df_count_after_load.collect()[0][0]
    
    verification_end_time = time.time()
    print(f"   üìä Total records in '{table_full_name}': {total_records_after_load:,}")
    print(f"   ‚è±Ô∏è  Verification completed in {verification_end_time - verification_start_time:.2f} seconds.")
    
    # Display count result
    print("\n   üìã Final table statistics:")
    df_count_after_load.show()
    
    # Additional table statistics if records were inserted
    if loaded_rows_count > 0:
        print("   üìà Final frequency distribution in table:")
        final_frequency_dist = spark.sql(f"""
            SELECT Frequency, COUNT(*) as count 
            FROM {table_full_name} 
            GROUP BY Frequency 
            ORDER BY Frequency
        """)
        final_frequency_dist.show()

except Exception as e:
    error_msg = f"‚ùå Error saving data to Delta table: {str(e)}"
    print(error_msg)
    logger.error(error_msg)
    
    # Additional error context
    print(f"   üîç Error context:")
    print(f"       Target database: {delta_table_database}")
    print(f"       Target table: {delta_table_destination}")
    print(f"       Full table name: {table_full_name}")
    raise

step_end_time = time.time()
step_timings['Step_4'] = step_end_time - step_start_time
print(f"\n‚úÖ Step 4 completed successfully in {step_timings['Step_4']:.2f} seconds.")
print("="*80)

In [None]:
# Final Summary and Cleanup
total_execution_end_time = time.time()
total_execution_time = total_execution_end_time - total_execution_start_time

print("\n" + "="*80)
print("üéâ NOTEBOOK EXECUTION COMPLETED SUCCESSFULLY")
print("="*80)

print(f"\nüìä EXECUTION SUMMARY:")
print(f"   ‚è±Ô∏è  Total execution time: {total_execution_time:.2f} seconds ({total_execution_time/60:.1f} minutes)")
print(f"   üìÖ Completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print(f"\nüìà PROCESSING STATISTICS:")
print(f"   üìä Total new records inserted: {loaded_rows_count:,}")
print(f"   üìä Total existing records (skipped): {unfiltered_rows_count:,}")
print(f"   üìä Final table record count: {total_records_after_load:,}")
print(f"   üìä Target table: {delta_table_database}.{delta_table_destination}")

print(f"\n‚è±Ô∏è  STEP-BY-STEP TIMING BREAKDOWN:")
for step, duration in step_timings.items():
    percentage = (duration / total_execution_time) * 100
    print(f"   {step}: {duration:.2f}s ({percentage:.1f}%)")

print(f"\nüßπ CLEANUP:")
# Unpersist cached DataFrames to free memory
try:
    if 'df_spark_refresh_schedule' in locals():
        df_spark_refresh_schedule.unpersist()
    if 'df_refresh_frequency_spark' in locals():
        df_refresh_frequency_spark.unpersist()
    if 'df_grouped_days' in locals():
        df_grouped_days.unpersist()
    if 'df_frequency_spark' in locals():
        df_frequency_spark.unpersist()
    if 'df_merged_spark' in locals():
        df_merged_spark.unpersist()
    if 'df_new_records' in locals():
        df_new_records.unpersist()
    print("   ‚úÖ Successfully released cached DataFrames from memory.")
except Exception as e:
    print(f"   ‚ö†Ô∏è  Note: Some DataFrames may not have been unpersisted: {str(e)}")

print(f"\nüèÅ All processing steps completed successfully!")
print(f"üíæ Data has been successfully saved to Delta table: {delta_table_database}.{delta_table_destination}")
print("="*80)

# Log final summary
logger.info(f"Notebook completed successfully. Inserted {loaded_rows_count:,} new records. "
           f"Total execution time: {total_execution_time:.2f} seconds.")