## Fabric Semantic Model Audit

### Overview

This notebook is designed to perform a comprehensive audit of Fabric semantic models by collecting and tracking logs and metadata over time. It supports ongoing evaluation of model performance, usage patterns, and metadata changes, which can help you:

- **Identify Unused or Obsolete Columns:** Determine which columns may be removed from the model or underlying Delta tables.
- **Monitor Performance:** Evaluate DAX query performance over time.
- **Track Model Usage:** Collect historical query logs and usage statistics.

### Key Components and Functionality

1. **Initial Setup and Requirements:**
   - **Workspace Monitoring:** The notebook requires that Workspace Monitoring is enabled. See [this blog post](https://blog.fabric.microsoft.com/blog/announcing-public-preview-of-workspace-monitoring) for guidance.
   - **Scheduled Execution:** To capture detailed usage statistics, it is recommended to schedule this notebook to run multiple times per day (e.g., 6 times per day).
   - **Configure Run Parameters:** Configure the run parameters at the top of the notebook based on your models and other requirements.
   - **Logging Lakehouse:** Attach a lakehouse so log tables can be saved. 

1. **Core Functionality:**
   - **Metadata Capture:** Functions to retrieve and save semantic model objects (columns and measures) along with dependencies using Fabric API calls.
   - **Query Log Collection:** Modules to capture query counts and detailed logs, which help track model usage and performance over specified time intervals.
   - **Unused Columns and Source Mapping:** Compares lakehouse metadata with model usage to detect columns that are no longer utilized.
   - **Cold Cache Performance:** Deploys a cloned version of the model to measure cold-cache performance via parallel DAX queries and trace log analysis.
   - **Resident Statistics:** Captures statistics about column residency (e.g., memory load, sizes) to further evaluate model performance.

1. **Star Schema Generation:**
   - The notebook constructs several star schema tables for in-depth analysis:
      - **DIM_ModelObject:** Latest definitions for columns, measures, and unused columns.
      - **DIM_Model:** Basic model details.
      - **DIM_Report:** Report information with optional known ID mapping.
      - **DIM_User:** Standardized user info from logs.
      - **FACT_ModelObjectQueryCount:** Ties query counts to model objects and their dependencies.
      - **FACT_ModelLogs:** Detailed logs for performance tracking.
      - **FACT_ModelObjectStatistics:** Combines daily statistics such as cold cache performance and memory size for columns.

1. **Orchestration and Execution:**
   - The main orchestration function (`collect_model_statistics`) processes each model sequentially, performing all capture steps (metadata, logs, unused columns, cold cache, resident statistics) and finally marking each run as completed or failed.
   - The notebook concludes by writing the star schema tables to Delta format, ready for import into a Fabric semantic model for further analysis.

### Usage Notes

- **Scheduling and Monitoring:** To capture granular historical data, consider scheduling this notebook to run at regular intervals throughout the day.
- **Configuration:** Adjust the parameters (e.g., `max_queries_daily`, `max_workers`) to suit your environment and workload.


### Install the Semantic Link Labs package
Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version.

In [None]:
%pip install semantic-link-labs

### Import Required Packages

In [None]:
# Standard Library Imports
import builtins
import functools
import math
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from datetime import datetime, timedelta
from uuid import uuid4

import pandas as pd
import pyspark

# Local Project-Specific Modules
import sempy.fabric as fabric
import sempy_labs as labs

# Third-Party Libraries
from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import col, lit, collect_set, udf
from pyspark.sql.types import StringType

### Set Initial Parameters

In [None]:
# Models to collect statistics for
models = [
    {
        "model_name": "Semantic Model Name", # The name of the target model
        "model_workspace_name": "Semantic Model Workspace Name", # The name of the target model workspace
        "lakehouse_name": "Semantic Model Lakehouse Name", # Only needed for Direct Lake models
        "lakehouse_workspace_name": "Semantic Model Lakehouse Workspace Name", # Only needed for DirectLake models
        "log_analytics_kusto_uri": "",     # Optional: provide your own Kusto URI or leave as empty string (leaving blank will result in using the get_workspace_monitoring_info function)
        "log_analytics_kusto_database_uuid": "", # Optional: provide your own Kusto DB Uuid or leave as empty string (leaving blank will result in using the get_workspace_monitoring_info function)
    },
]

# Settings for collecting cold cache performance measurements.
collect_cold_cache_measurements = True # Only recommended for Direct Lake or small Import models
max_queries_daily = 1                  # Maximum cold cache performance queries per column per day
max_workers = 50                       # Number of concurrent column queries

# Determine target workspace for the cloned model (used in cold cache measurements)
cold_cache_target_workspace_name = fabric.resolve_workspace_name()

# Settings for model query collection (how many days back to collect data)
max_days_ago_to_collect = 30  # Collect data from 1 to 30 days ago (only days with no data are collected)

# Adjustment for recent logs: exclude intervals within this many hours of the as-of datetime for the most recent day
min_hours_before_current = 3

# Principal name collection mode:
#   0 = Keep original ExecutingUser,
#   1 = Anonymize using historical mapping,
#   2 = Always set to "Masked"
collect_principal_names = 0
mask_principal_names_after_days = 30  # Set to 0 if masking is not required

# Define user groups to bucket executing users found in the object count and detailed logs.
user_groups = {  
    "Engineers (Example)": [
        "engineer1@microsoft.com",
        "engineer2@microsoft.com",
    ],
    "Project Managers (Example)": [
        "pm1@microsoft.com",
        "pm2@microsoft.com",
    ],
}
default_user_group = "Other Users"

# Define report mappings for cases where monitoring workspace assigns unexplained UUIDs
report_uuid_mapping = [
    {"ReportUuid": "Unmappable uuid in workspace monitoring", "MapToReportUuid": "Main uuid for report to map to"},
]

# Replace with your target schema name or leave as an empty string for default behavior.
lakehouse_target_schema = "dbo"

# Delta table names for historical data (new records are appended each run)
historical_table_names = {
    "run_history": "run_history",
    "model_columns": "model_columns",
    "model_measures": "model_measures",
    "object_query_count": "model_object_query_count",
    "detailed_logs": "model_detailed_logs",
    "object_mapping": "model_object_mapping",
    "dependencies": "model_dependencies",
    "unused_columns": "unused_delta_table_columns",
    "source_mapping": "model_column_source_mapping",
    "cold_cache_measurements": "model_column_cold_cache_measurement",
    "resident_statistics": "model_column_resident_statistics",
    "source_reports": "source_reports",
}

# Delta table names for star schema (these tables are overwritten each run)
star_schema_table_names = {
    "dim_model_object": "DIM_ModelObject",
    "dim_model": "DIM_Model",
    "dim_report": "DIM_Report",
    "dim_user": "DIM_User",
    "fact_model_object_query_count": "FACT_ModelObjectQueryCount",
    "fact_detailed_logs": "FACT_ModelLogs",
    "fact_model_statistics": "FACT_ModelObjectStatistics",
}

# Flags for table management
force_delete_historical_tables = False
force_delete_incomplete_runs = True

# Abfss base path
abfss_base_path = "onelake.dfs.fabric.microsoft.com"

# Ensure Spark uses case-sensitive SQL
spark.conf.set("spark.sql.caseSensitive", True)

### Helper Functions: Logging, Retry, and Saving DataFrames

In [None]:
# Thread-local storage to track call depth (used for logging indentation)
# This allows each thread to maintain its own "call depth" counter independently.
_thread_local = threading.local()


@contextmanager
def indented_print(indent_level: int):
    """
    A context manager that temporarily replaces the built-in print function.
    It prepends a specific indent (based on indent_level) to every print output,
    which makes nested function calls easier to trace visually.
    """
    # Save the original print function so it can be restored later.
    original_print = builtins.print

    def custom_print(*args, **kwargs):
        # Create an indent by repeating four spaces per indent level.
        indent = "    " * indent_level
        # Call the original print with the indented message.
        original_print(indent + " ".join(map(str, args)), **kwargs)

    # Replace the built-in print with our custom_print.
    builtins.print = custom_print
    try:
        # Yield control back to the caller.
        yield
    finally:
        # Restore the original print function after exiting the block.
        builtins.print = original_print


def log_function_calls(func):
    """
    Decorator that logs the start and end of a function call using indented printing.
    It uses a thread-local counter to indent log messages, so nested calls are visually offset.
    
    Example:
        @log_function_calls
        def my_func():
            ...
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Initialize call_depth for the thread if not already set.
        if not hasattr(_thread_local, "call_depth"):
            _thread_local.call_depth = 0

        # Capture the current call depth to determine the indent.
        indent = _thread_local.call_depth

        # Log the start message using the indented_print context manager.
        with indented_print(indent):
            print(f"✅ {func.__name__} - Starting")

        # Increase the call depth as we enter the function.
        _thread_local.call_depth += 1
        try:
            # Log any output inside the function with increased indentation.
            with indented_print(_thread_local.call_depth):
                result = func(*args, **kwargs)
        finally:
            # Decrease the call depth on function exit.
            _thread_local.call_depth -= 1
            with indented_print(_thread_local.call_depth):
                print(f"✅ {func.__name__} - Ending")
        return result

    return wrapper


def retry(exceptions, num_retries=3, initial_delay=5, backoff_factor=2, logger=None):
    """
    Decorator factory that returns a decorator to automatically retry a function call if it raises
    one of the specified exceptions. It uses exponential backoff between retries.
    
    Parameters:
        exceptions (tuple or Exception): Exception(s) that trigger a retry.
        num_retries (int): Number of retry attempts before giving up.
        initial_delay (int): Initial delay in seconds before the first retry.
        backoff_factor (int): Factor by which the delay is multiplied after each retry.
        logger (callable, optional): Logger function for reporting retries (defaults to print).
    
    Usage:
        @retry((ValueError,), num_retries=3, initial_delay=2, backoff_factor=2)
        def my_func():
            ...
    """
    def decorator_retry(func):
        @functools.wraps(func)
        def wrapper_retry(*args, **kwargs):
            attempts, delay = num_retries, initial_delay
            # Retry loop: try the function until attempts are exhausted.
            while attempts > 1:
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    msg = f"⚠️ {func.__name__} failed with {e}, retrying in {delay} seconds..."
                    if logger:
                        logger(msg)
                    else:
                        print(msg)
                    # Pause execution for 'delay' seconds before retrying.
                    time.sleep(delay)
                    attempts -= 1
                    # Increase the delay for the next attempt.
                    delay *= backoff_factor
            # Final attempt: if previous retries failed, let any exception propagate.
            return func(*args, **kwargs)
        return wrapper_retry
    return decorator_retry


@log_function_calls
def save_dataframe_to_delta_table(data, table_name: str, context: dict, **extra_columns) -> None:
    """
    Appends a pandas or Spark DataFrame to a Delta table with additional contextual columns.
    Extra columns provided as keyword arguments are added to the DataFrame before writing.
    
    Parameters:
        data (pandas.DataFrame or pyspark.sql.DataFrame): The input data.
        table_name (str): The target Delta table name.
        context (dict): A context dictionary that must contain keys:
            - 'as_of_datetime'
            - 'as_of_date'
            - 'run_uuid'
            - 'source_model_uuid'
        **extra_columns: Any additional columns to add to the DataFrame.
    
    The function converts a pandas DataFrame to a Spark DataFrame if necessary and ensures
    that column names have no spaces. It then writes the DataFrame to the specified Delta table.
    """
    # Default columns added to every record from the context.
    default_cols = {
        "AsOfDateTime": context["as_of_datetime"],
        "AsOfDate": context["as_of_date"],
        "RunUuid": context["run_uuid"],
        "ModelUuid": context["source_model_uuid"],
    }
    # Merge default columns with any extra columns provided.
    all_extra_cols = {**default_cols, **extra_columns}

    def add_columns(df, cols: dict):
        """
        Helper function to add extra columns to a DataFrame.
        Works for both pandas and Spark DataFrames.
        """
        for col_name, value in cols.items():
            if isinstance(df, pd.DataFrame):
                # Direct assignment for pandas DataFrame.
                df[col_name] = value
            else:
                # For Spark DataFrame, use the withColumn method and lit() to add constant columns.
                df = df.withColumn(col_name, lit(value))
        return df

    if isinstance(data, pd.DataFrame):
        # For pandas DataFrame, remove spaces in column names for consistency.
        data.columns = data.columns.str.replace(" ", "", regex=True)
        data = add_columns(data, all_extra_cols)
        # Convert the cleaned pandas DataFrame into a Spark DataFrame.
        spark_df = spark.createDataFrame(data)
    elif isinstance(data, pyspark.sql.DataFrame):
        # For Spark DataFrame, rename columns by removing any spaces.
        for c in data.columns:
            data = data.withColumnRenamed(c, c.replace(" ", ""))
        spark_df = add_columns(data, all_extra_cols)
    else:
        # Raise error if data is not a recognized DataFrame type.
        raise TypeError("❌ Unsupported data type. Expected pandas or Spark DataFrame.")

    try:
        # Write the DataFrame to the specified Delta table.
        # The "mergeSchema" option allows the schema to evolve if needed.
        spark_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(table_name)
        print(f"✅ Table `{table_name}` updated successfully.")
    except Exception as e:
        print(f"❌ Failed to save table `{table_name}`. Error: {e}")
        raise


### Run History & Cleanup Functions

In [None]:
@log_function_calls
def record_run_start(context: dict) -> None:
    """
    Records the start of a run by inserting a new record into the run_history table.
    
    It creates a DataFrame that includes the current start time, a placeholder for the end time,
    and a status of 'started'. Additional context data (except for keys that are handled elsewhere)
    is included to help identify the run.
    """
    # Filter out keys that are specific to run identification and handled separately,
    # ensuring that the DataFrame only includes the general context information.
    context_filtered = {
        k: v for k, v in context.items() if k not in ["run_uuid", "source_model_uuid"]
    }
    # Construct a pandas DataFrame with a single row representing the run start.
    # Both StartTime and EndTime are set to the current timestamp; EndTime will be updated upon completion.
    run_start_df = pd.DataFrame(
        [
            {
                **context_filtered,
                "StartTime": datetime.now(),  # Capture the current time as the start time.
                "EndTime": datetime.now(),    # Placeholder for end time; to be updated later.
                "Status": "started",          # Set initial status as 'started'.
            }
        ]
    )
    # Write the run start DataFrame to the Delta table designated for run history.
    save_dataframe_to_delta_table(
        data=run_start_df,
        table_name=historical_table_names["run_history"],
        context=context,
    )
    # Log a confirmation message including the run's unique identifier.
    print(f"✅ Recorded run start for UUID: {context['run_uuid']}")


@log_function_calls
def record_run_completion(context: dict, status: str) -> None:
    """
    Updates the run_history table to mark the run as completed or failed.
    
    It sets the EndTime to the current timestamp and updates the run's Status.
    The function ensures that the run_uuid is present and safely escapes it for SQL usage.
    """
    # Retrieve the unique run identifier from the context.
    run_uuid = context.get("run_uuid")
    if not run_uuid:
        raise ValueError("❌ 'run_uuid' missing from context.")
    # Escape single quotes in the run_uuid to prevent SQL injection or syntax issues.
    escaped_uuid = run_uuid.replace("'", "''")
    # Format the current datetime as a string suitable for SQL TIMESTAMP.
    end_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # Construct the SQL update query to set EndTime and Status for the given run.
    update_query = f"""
        UPDATE {historical_table_names["run_history"]}
        SET EndTime = CAST('{end_time_str}' AS TIMESTAMP),
            Status = '{status}'
        WHERE RunUuid = '{escaped_uuid}'
    """
    try:
        # Execute the SQL update query using Spark's SQL interface.
        spark.sql(update_query)
        # Log a success message indicating the run has been updated.
        print(f"✅ Run UUID: {run_uuid} updated with status '{status}'.")
    except Exception as e:
        # Log the error if the update fails and re-raise the exception.
        print(f"❌ Failed to update run UUID: {run_uuid}. Error: {e}")
        raise


@log_function_calls
def cleanup_incomplete_runs() -> None:
    """
    Removes records associated with runs that did not complete successfully.
    
    The function performs two main operations:
      1. Deletes records from all historical tables (except run_history) corresponding to incomplete runs.
      2. Updates the run_history table to mark those incomplete runs as 'removed'.
      
    This cleanup helps maintain data consistency by removing or flagging partially recorded runs.
    """
    # Check if the run_history table exists; if not, there is nothing to clean up.
    if not spark.catalog.tableExists(historical_table_names["run_history"]):
        print("✅ run_history table does not exist yet. No cleanup necessary.")
        return

    try:
        # Retrieve all run_uuids from run_history where the Status is not 'completed' or 'removed'.
        incomplete_df = (
            spark.table(historical_table_names["run_history"])
            .filter(~col("Status").isin("completed", "removed"))
            .select("RunUuid")
        )
        # Collect the run uuids from the DataFrame to a list.
        incomplete_uuids = [row["RunUuid"] for row in incomplete_df.collect()]

        # If there are no incomplete runs, log the info and exit.
        if not incomplete_uuids:
            print("✅ No incomplete runs to clean.")
            return

        print(f"✅ Found {len(incomplete_uuids)} incomplete run(s); proceeding with cleanup.")
        # Escape each run_uuid for safe SQL query usage.
        escaped_uuids = [uuid.replace("'", "''") for uuid in incomplete_uuids]
        # Create a comma-separated string of escaped run_uuids for use in SQL IN clause.
        uuid_list_str = ", ".join(f"'{uuid}'" for uuid in escaped_uuids)

        # Iterate over each historical table (except run_history) to remove incomplete run records.
        for logical_name, table in historical_table_names.items():
            if logical_name == "run_history":
                continue  # Skip the run_history table in this deletion loop.
            try:
                # Only attempt deletion if the table exists.
                if not spark.catalog.tableExists(table):
                    print(f"✅ Table {table} not found. Skipping deletion.")
                    continue
                # Construct and execute the deletion query for the current table.
                delete_query = f"DELETE FROM {table} WHERE RunUuid IN ({uuid_list_str})"
                spark.sql(delete_query)
                print(f"✅ Deleted records in table {table} for incomplete runs.")
            except Exception as e:
                # Log the error for the current table and continue with the next one.
                print(f"❌ Failed to clean table {table}. Error: {e}")
                continue

        # After cleaning other tables, update the run_history table to mark incomplete runs as 'removed'.
        update_query = f"""
            UPDATE {historical_table_names["run_history"]}
            SET Status = 'removed'
            WHERE RunUuid IN ({uuid_list_str})
        """
        spark.sql(update_query)
        print(f"✅ Marked {len(incomplete_uuids)} incomplete run(s) as removed.")
    except Exception as e:
        # Log and re-raise any exception encountered during the cleanup process.
        print(f"❌ Cleanup failed. Error: {e}")
        raise


def drop_historical_tables() -> None:
    """
    Drops all historical Delta tables.
    
    Use this function with caution as it permanently deletes all historical audit data
    stored in the tables defined in the 'historical_table_names' mapping.
    """
    # Loop through each historical table and attempt to drop it.
    for logical_name, table in historical_table_names.items():
        try:
            print(f"🗑️ Dropping table: {table}")
            # Execute the DROP TABLE command; IF EXISTS ensures no error is thrown if the table doesn't exist.
            spark.sql(f"DROP TABLE IF EXISTS {table}")
            print(f"✅ Dropped table: {table}")
        except Exception as e:
            # Log any failure to drop a table.
            print(f"❌ Failed to drop table `{table}`. Error: {e}")

### Capturing Semantic Model Objects & Dependencies

In [None]:
@log_function_calls
def capture_semantic_model_objects(context: dict) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Retrieves columns and measures for the specified semantic model.

    It uses Fabric API calls to capture:
      - Columns (with extended metadata) from the model.
      - Measures from the model.
    The captured data is saved to Delta tables for historical tracking.

    Returns:
      A tuple (model_columns, model_measures) as pandas DataFrames.
    """
    # Check that the context contains the required keys for API calls.
    for key in ["source_model_uuid", "source_model_workspace_uuid"]:
        if key not in context:
            raise KeyError(f"❌ Missing context key: '{key}'")
    try:
        # Refresh the Table Object Model (TOM) cache to ensure up-to-date metadata.
        fabric.refresh_tom_cache(context["source_model_workspace_uuid"])

        # Retrieve model columns with extended metadata via Sempy.
        model_columns = fabric.list_columns(
            dataset=context["source_model_uuid"],
            extended=True,
            workspace=context["source_model_workspace_uuid"],
        )
        # Clean column names by removing spaces for consistency.
        model_columns.columns = model_columns.columns.str.replace(" ", "", regex=True)
        # Save the captured columns to the Delta table for historical tracking.
        save_dataframe_to_delta_table(
            data=model_columns,
            table_name=historical_table_names["model_columns"],
            context=context,
        )

        # Similarly, capture model measures from Sempy.
        model_measures = fabric.list_measures(
            dataset=context["source_model_uuid"],
            workspace=context["source_model_workspace_uuid"],
        )
        # Remove spaces from measure column names.
        model_measures.columns = model_measures.columns.str.replace(" ", "", regex=True)
        # Save the captured measures to the corresponding Delta table.
        save_dataframe_to_delta_table(
            data=model_measures,
            table_name=historical_table_names["model_measures"],
            context=context,
        )
    except Exception as e:
        print(
            f"❌ Failed to capture model objects for model `{context['source_model_uuid']}`. Error: {e}"
        )
        raise
    return model_columns, model_measures


def find_dependencies_recursive(
    dependencies: pd.DataFrame,
    root_table: str,
    root_object: str,
    ref_table: str,
    ref_object: str,
    level: int,
    accum: list,
) -> None:
    """
    Recursively finds dependencies for a measure.

    It traverses the dependency DataFrame (obtained from a DAX query) to:
      - Append each dependency with the current recursion level.
      - Recursively follow further dependencies if the referenced object is a measure.

    Parameters:
      dependencies (pd.DataFrame): DataFrame containing dependency data.
      root_table (str): The table name of the original measure.
      root_object (str): The measure name for which dependencies are being traced.
      ref_table (str): The table name of the current referenced object.
      ref_object (str): The name of the current referenced object.
      level (int): The current recursion depth (starting at 1).
      accum (list): List used to accumulate dependency entries.
    """
    # Filter the dependency DataFrame to only include rows matching the current reference.
    refs = dependencies[
        (dependencies["[TABLE]"] == ref_table) &
        (dependencies["[OBJECT]"] == ref_object)
    ]
    for _, row in refs.iterrows():
        # Append dependency details to the accumulator.
        accum.append(
            {
                "ObjectType": "MEASURE",  # All entries here are measures.
                "TableName": root_table,  # Original measure's table.
                "ObjectName": root_object,  # Original measure's name.
                "ReferencedObjectType": row["[REFERENCED_OBJECT_TYPE]"],
                "ReferencedTableName": row["[REFERENCED_TABLE]"],
                "ReferencedObjectName": row["[REFERENCED_OBJECT]"],
                "Level": level,  # Record the current recursion level.
            }
        )
        # If the referenced object is itself a measure, continue recursion.
        if row["[REFERENCED_OBJECT_TYPE]"] == "MEASURE":
            find_dependencies_recursive(
                dependencies,
                root_table,
                root_object,
                row["[REFERENCED_TABLE]"],
                row["[REFERENCED_OBJECT]"],
                level + 1,  # Increase recursion level.
                accum,
            )


@log_function_calls
def capture_semantic_model_dependencies(
    context: dict, model_measures: pd.DataFrame
) -> None:
    """
    Captures dependencies for model measures.

    It runs a DAX query to retrieve dependency data for measures,
    then recursively traces dependencies for each measure using the provided model_measures DataFrame.
    The complete dependency mapping is saved to the Delta table for dependencies.
    """
    # Execute a DAX query to fetch dependency data for measures.
    model_measure_deps = fabric.evaluate_dax(
        dataset=context["source_model_uuid"],
        workspace=context["source_model_workspace_uuid"],
        dax_string="""
            EVALUATE
            FILTER(
                INFO.CALCDEPENDENCY("OBJECT_TYPE", "MEASURE"),
                [REFERENCED_OBJECT_TYPE] IN { "COLUMN", "MEASURE" }
            )
        """,
    )
    accum = []  # Initialize an empty list to accumulate dependency entries.
    # Iterate over each measure to start the recursive dependency search.
    for _, row in model_measures.iterrows():
        # Begin recursion for each measure using its own table and name as the root.
        find_dependencies_recursive(
            dependencies=model_measure_deps,
            root_table=row["TableName"],
            root_object=row["MeasureName"],
            ref_table=row["TableName"],
            ref_object=row["MeasureName"],
            level=1,  # Start at level 1.
            accum=accum,
        )
    # Convert the accumulated dependency records to a Spark DataFrame and save to Delta table.
    save_dataframe_to_delta_table(
        data=spark.createDataFrame(accum),
        table_name=historical_table_names["dependencies"],
        context=context,
    )

### Processing Semantic Model Objects & Saving Mappings

In [None]:
@log_function_calls
def process_semantic_model_objects(
    model_objects: pd.DataFrame, object_type: str
) -> pd.DataFrame:
    """
    Standardizes the metadata for model objects (columns or measures) into a mapping DataFrame.

    For columns, it produces multiple formatting variants (e.g., quoted, unquoted, bracketed)
    to facilitate later matching. For measures, only a single representation is generated.

    Returns:
      A DataFrame with standardized object mapping information, including:
         - TableName: Original table name.
         - ObjectName: The base name (column or measure name).
         - ObjectType: 'COLUMN' or 'MEASURE'.
         - ModelObject: The variant string representation for matching.
    """

    def map_row(row: pd.Series) -> list:
        # Process based on whether the object is a column or a measure.
        if object_type == "COLUMN":
            tbl = row["TableName"]
            col_name = row["ColumnName"]
            # Generate different string variants for the column.
            quoted_variant = f"'{tbl}'[{col_name}]"   # e.g., 'TableName'[ColumnName]
            unquoted_variant = quoted_variant.replace("'", "")  # Remove quotes, e.g., TableName[ColumnName]
            bracket_variant = f"[{tbl}].[{col_name}]"   # e.g., [TableName].[ColumnName]
            # If the table name includes spaces, avoid the unquoted variant.
            variants = (
                [quoted_variant, bracket_variant]
                if " " in tbl
                else [quoted_variant, unquoted_variant, bracket_variant]
            )
            obj_name = col_name
        else:
            # For measures, generate only one variant.
            measure = row["MeasureName"]
            variants = [f"[{measure}]"]  # Format measure as [MeasureName]
            obj_name = measure

        # Base dictionary holds common mapping fields.
        base = {
            "TableName": row["TableName"],
            "ObjectName": obj_name,
            "ObjectType": object_type,
        }
        # Return a list of dictionaries, one for each variant.
        return [{**base, "ModelObject": variant} for variant in variants]

    # For each row in the input DataFrame, apply map_row to produce a list of mapping dictionaries.
    mapped = [item for _, row in model_objects.iterrows() for item in map_row(row)]
    # Convert the list of mapping dictionaries into a pandas DataFrame.
    return pd.DataFrame(mapped)


@log_function_calls
def save_report_measure_mappings(distinct_objects: set, context: dict) -> None:
    """
    Saves new mappings for REPORT MEASURE objects.

    It parses each report measure string using regular expressions to extract:
      - The table name (from content within square brackets or single quotes).
      - The measure name (from a specific DAX pattern).
    These mappings are then saved to the object_mapping Delta table.
    """
    rows = []
    # Regular expression to capture table names enclosed in either square brackets or single quotes.
    table_pattern = re.compile(r"(?:\[(?P<name_bracket>[^\]]+)\]|'(?P<name_quote>[^']+)')")
    
    # Regular expression to capture the measure name from the expression pattern.
    measure_pattern = re.compile(r"\[([^\]]+)\]\s*=\s*\(\/\* USER DAX BEGIN \*\/")

    # Iterate over each report measure string in the distinct_objects set.
    for model_object in distinct_objects:
        tbl_match = table_pattern.search(model_object)
        measure_match = measure_pattern.search(model_object)
        # Extract the table name from the regex groups if a match is found; otherwise, set as None.
        if tbl_match:
            table_name = tbl_match.group("name_bracket") or tbl_match.group("name_quote")
        else:
            table_name = None

        # Append a mapping dictionary with the extracted values.
        rows.append(
            {
                "TableName": table_name,
                "ObjectName": measure_match.group(1) if measure_match else None,
                "ModelObject": model_object,
                "ObjectType": "REPORT MEASURE",
            }
        )
    # If mappings were found, convert them to a DataFrame and save to the Delta table.
    if rows:
        df_report = pd.DataFrame(rows)
        save_dataframe_to_delta_table(
            data=df_report,
            table_name=historical_table_names["object_mapping"],
            context=context,
        )
        print(f"✅ Saved {len(rows)} REPORT MEASURE mappings.")

### Define Query Log Collector Class

In [None]:
class QueryLogCollector:
    def __init__(self, context: dict):
        # Save the context dictionary, which contains configuration parameters needed for queries.
        self.context = context

    @staticmethod
    def find_starting_index(block_hours: float) -> int:
        """
        Determines the starting index for allowed interval granularity based on the block's length in hours.
        
        It compares the given block_hours against a list of allowed intervals (24 down to 1 hour).
        Returns the index in ALLOWED_INTERVALS for which block_hours is greater than or equal.
        """
        ALLOWED_INTERVALS = list(range(24, 0, -1))  # Allowed granularity from 24h down to 1h.
        for i, hrs in enumerate(ALLOWED_INTERVALS):
            if block_hours >= hrs:
                return i
        # If no allowed interval is less than or equal, return the index corresponding to the smallest interval.
        return len(ALLOWED_INTERVALS) - 1

    @staticmethod
    def group_missing_hours(missing_hours: list) -> list:
        """
        Groups a list of missing hour integers into continuous intervals.
        
        For example, if missing_hours = [2,3,4,6,7], it returns [(2,5), (6,8)],
        where each tuple represents a half-open interval [start, end).
        """
        if not missing_hours:
            return []
        intervals = []
        start = missing_hours[0]
        end = missing_hours[0]
        # Iterate over the missing hours and group consecutive hours together.
        for h in missing_hours[1:]:
            if h == end + 1:
                end = h  # Extend the current interval.
            else:
                # Append the current interval as (start, end+1) and start a new interval.
                intervals.append((start, end + 1))
                start = h
                end = h
        # Append the last interval.
        intervals.append((start, end + 1))
        return intervals

    @staticmethod
    def get_missing_hours_for_day(log_table, day_date) -> list:
        """
        Determines which hours (0-23) have missing data for a given day by comparing against the log table.
        
        Parameters:
          log_table: A Spark DataFrame containing log records.
          day_date: A date object representing the day to check.
        
        Returns:
          A list of hour integers for which no log data is found.
        """
        existing = set()
        if log_table is not None:
            try:
                # Filter the log table for the given day, select distinct 'AsOfHour', and collect them into a set.
                existing = {row["AsOfHour"] for row in log_table.filter(col("AsOfDate") == day_date)
                            .select("AsOfHour").distinct().collect()}
            except Exception as e:
                print(f"⚠️ Error retrieving existing hours for {day_date}: {e}")
        # Return hours from 0 to 23 that are not present in the existing set.
        return [hour for hour in range(24) if hour not in existing]

    def build_user_groups(self) -> tuple:
        """
        Constructs dynamic arrays and case conditions for user groups for use in KQL queries.
        
        For each user group, it creates:
          - A 'let' statement that defines a dynamic array of user emails.
          - A case condition to map an executing user to the group name.
        
        Returns:
          A tuple with two strings: one for the combined let statements and one for the combined case conditions.
        """
        let_statements = []
        case_conditions = []
        for group, emails in user_groups.items():
            # Normalize the group name to be used as a variable (remove spaces and dashes, and lower-case).
            grp_var = group.replace(" ", "").replace("-", "").lower()
            # Construct a comma-separated string of email addresses, each enclosed in single quotes.
            emails_str = ", ".join("'{0}'".format(email) for email in emails)
            let_statements.append("let {0} = dynamic([{1}]);".format(grp_var, emails_str))
            # Build the case condition that assigns the group name if the ExecutingUser is in the group.
            case_conditions.append('ExecutingUser in ({0}), "{1}"'.format(grp_var, group))
        # Join all let statements and case conditions into single strings.
        return "\n".join(let_statements), ",\n".join(case_conditions)

    def generate_object_count_query(self, object_type: str, model_objects_df: pd.DataFrame,
                                    start_ts: datetime, end_ts: datetime,
                                    include_summary: bool = False) -> str:
        """
        Generates a KQL query to count queries for the given object type.
        
        For "REPORT MEASURE" objects, a specific extraction is applied to retrieve the measure.
        For other object types, it builds a dynamic array of model objects to check for matches in log text.
        
        Parameters:
          object_type (str): The type of object ("REPORT MEASURE", "COLUMN", "MEASURE", etc.).
          model_objects_df (pd.DataFrame): DataFrame containing standardized model objects.
          start_ts (datetime): Start timestamp for filtering logs.
          end_ts (datetime): End timestamp for filtering logs.
          include_summary (bool): If True, adds a summarization clause to the query.
        
        Returns:
          A string representing the constructed KQL query.
        """
        let_statements, case_conditions = self.build_user_groups()
        if object_type == "REPORT MEASURE":
            # For report measures, extract the measure from EventText using a regex pattern.
            model_extend = """
            | extend ModelObject = extract_all(@"MEASURE (.*?\\/\\* USER DAX END \\*\\/\\))", EventText)
            | mv-expand ModelObject
            | where isnotempty(ModelObject)
            """
        else:
            if model_objects_df is not None:
                # Build a dynamic array of model objects to match against the EventText.
                objs_list = ",\n".join('"{}"'.format(obj) for obj in model_objects_df["ModelObject"])
                let_statements += "\nlet modelObjects = dynamic([{0}]);".format(objs_list)
            # Use mv-apply to check for presence of any model object in the EventText.
            model_extend = """
            | mv-apply ModelObject = modelObjects to typeof(string) on (
                extend Matched = iff(EventText contains_cs ModelObject, true, false)
                | where Matched
                | project-away Matched
            )
            """
        # Format the start and end timestamps in the required datetime format for KQL.
        start_str = start_ts.strftime("datetime(%Y-%m-%dT%H:%M:%S)")
        end_str = end_ts.strftime("datetime(%Y-%m-%dT%H:%M:%S)")
        # Construct the full query string using f-string formatting.
        query = f'''
        let model_name = "{self.context["source_model_name"]}";
        {let_statements}
        SemanticModelLogs
            | where Timestamp between ({start_str} .. {end_str})
            | where ItemName == model_name
            | where OperationName == "QueryBegin"
            | extend ReportId = extract_json("$.Sources[0].ReportId", tostring(parse_xml(XmlaProperties)["PropertyList"]["ApplicationContext"]), typeof(string))
            | extend AsOfHour = datetime_part("hour", Timestamp)
            | project AsOfHour, ExecutingUser, EventText, ReportId
            {model_extend}
            | extend ExecutingUserGroup =
                case(
                    {case_conditions},
                    "{default_user_group}"
                )
            | summarize QueryCount = count() by tostring(ModelObject), AsOfHour, ExecutingUserGroup, ReportId
        '''.replace("\n", " ").strip()
        if include_summary:
            query += " | summarize totalCount = count()"
        return query

    def generate_detailed_query(self, start_ts: datetime, end_ts: datetime,
                                include_summary: bool = False) -> str:
        """
        Generates a KQL query to capture detailed DAX query metrics.

        It builds separate subqueries to retrieve query begin and query end events,
        then joins them to combine detailed performance metrics and user information.

        Parameters:
          start_ts (datetime): Start timestamp for the query window.
          end_ts (datetime): End timestamp for the query window.
          include_summary (bool): If True, includes a final summarization clause.
        
        Returns:
          A string representing the complete KQL query for detailed logs.
        """
        let_statements, case_conditions = self.build_user_groups()
        start_iso = start_ts.isoformat(timespec="seconds")
        end_iso = end_ts.isoformat(timespec="seconds")
        query_begin_start_iso = (start_ts - timedelta(days=1)).isoformat(timespec="seconds")
        query = f'''
        let model_name = "{self.context["source_model_name"]}";
        {let_statements}
        let base_data =
            SemanticModelLogs
            | where ItemName == model_name;
        let query_end = base_data
            | where Timestamp between (datetime({start_iso}) .. datetime({end_iso}))
            | where OperationName in ("Error", "QueryEnd")
            | project Timestamp, OperationName, OperationDetailName, OperationId, XmlaSessionId, ExecutingUser, DurationMs, CpuTimeMs, EventText, Status, StatusCode;
        let query_begin = base_data
            | where Timestamp between (datetime({query_begin_start_iso}) .. datetime({end_iso}))
            | where OperationName == "QueryBegin"
            | extend ActivityId = tostring(parse_xml(XmlaProperties)["PropertyList"]["DbpropMsmdActivityID"])
            | extend RequestId = tostring(parse_xml(XmlaProperties)["PropertyList"]["DbpropMsmdRequestID"])
            | extend CurrentActivityId = tostring(parse_xml(XmlaProperties)["PropertyList"]["DbpropMsmdCurrentActivityID"])
            | extend ReportId = extract_json("$.Sources[0].ReportId", tostring(parse_xml(XmlaProperties)["PropertyList"]["ApplicationContext"]), typeof(string))
            | distinct ActivityId, RequestId, CurrentActivityId, ReportId, OperationId, XmlaSessionId;
        query_end
        | join kind=leftouter (query_begin) on OperationId, XmlaSessionId
        | extend ExecutingUserGroup =
            case(
                    {case_conditions},
                    "{default_user_group}"
                )
        | extend AsOfHour = datetime_part("hour", Timestamp)
        | project Timestamp, AsOfHour, OperationName, OperationDetailName, ReportId, ExecutingUser, ExecutingUserGroup, DurationMs, CpuTimeMs, EventText, OperationId, XmlaSessionId, ActivityId, RequestId, CurrentActivityId, Status, StatusCode
        '''.replace("\n", "").strip()
        if include_summary:
            query += " | summarize totalCount = count()"
        return query

    @staticmethod
    @log_function_calls
    @retry(exceptions=(Exception,), num_retries=2, initial_delay=30, backoff_factor=2, logger=print)
    def execute_query(context: dict, kql_query: str) -> pyspark.sql.DataFrame:
        """
        Executes a KQL query against the Log Analytics Kusto database.

        It obtains an access token via mssparkutils, sets up the Kusto Spark connector with the proper options,
        and returns the result as a Spark DataFrame. The retry decorator ensures that transient failures are retried.
        """
        try:
            # Retrieve an access token required for Kusto authentication.
            access_token = mssparkutils.credentials.getToken(context["log_analytics_kusto_uri"])
            # Configure and run the Kusto query via the Spark data source.
            result = (
                spark.read.format("com.microsoft.kusto.spark.synapse.datasource")
                .option("accessToken", access_token)
                .option("kustoCluster", context["log_analytics_kusto_uri"])
                .option("kustoDatabase", context["log_analytics_kusto_database"])
                .option("kustoQuery", kql_query)
                .load()
            )
            print("✅ KQL query executed successfully.")
            return result
        except Exception:
            print("❌ Failed to execute KQL query.")
            raise

    def attempt_interval_query(self, start_ts: datetime, end_ts: datetime, current_idx: int,
                               generate_test_query, generate_main_query, process_result) -> bool:
        """
        Attempts to run a query for the specified interval and, if necessary,
        subdivides the interval until complete data is returned.

        This method works by:
          1. Running a test query to verify the expected row count.
          2. If no data is found, it skips the interval.
          3. If data is found, it runs the main query and checks if the results match.
          4. On discrepancies, it subdivides the interval into smaller chunks recursively.

        Parameters:
          start_ts (datetime): Start timestamp for the query interval.
          end_ts (datetime): End timestamp for the query interval.
          current_idx (int): Current index into ALLOWED_INTERVALS, representing the current granularity.
          generate_test_query (callable): Function that generates a test query for the given interval.
          generate_main_query (callable): Function that generates the main query for the interval.
          process_result (callable): Callback function to process the main query result.
        
        Returns:
          bool: True if the interval or its sub-intervals were processed successfully, False otherwise.
        """
        try:
            print(f"ℹ️ Sending test query for interval {start_ts} to {end_ts}...")
            # Generate and execute the test query.
            test_query = generate_test_query(start_ts, end_ts)
            test_result = self.execute_query(self.context, test_query)
            test_row = test_result.first()
            test_count = 0 if test_row is None else test_row["totalCount"]

            # If the test query returns 0 rows, skip processing this interval.
            if test_count == 0:
                print(f"⚠️ Test query returned 0 rows for interval {start_ts} to {end_ts}. Skipping...")
                print(f"✅ Completed empty interval {start_ts} to {end_ts}.")
                return True

            print(f"ℹ️ Sending main query for interval {start_ts} to {end_ts}...")
            # Generate and execute the main query.
            main_query = generate_main_query(start_ts, end_ts)
            main_result = self.execute_query(self.context, main_query)
            main_count = main_result.count()

            # Validate that the main query returns the same row count as the test query.
            if main_count != test_count:
                raise Exception(f"Query result truncated: main_count ({main_count}) != test_count ({test_count})")
            else:
                print(f"✅ Query results match: {main_count} rows.")
            # Process the query result via the provided callback.
            process_result(main_result, start_ts)
            return True
        except Exception as e:
            # If an exception occurs, attempt to subdivide the interval.
            ALLOWED_INTERVALS = list(range(24, 0, -1))
            print(f"❌ Interval {start_ts} to {end_ts} at granularity {ALLOWED_INTERVALS[current_idx]}h failed. Error: {e}")
            if current_idx == len(ALLOWED_INTERVALS) - 1:
                # If the smallest granularity is reached, log and skip the interval.
                print(f"❌ Minimum granularity reached for {start_ts} to {end_ts}; skipping interval.")
                return False
            else:
                new_idx = current_idx + 1
                new_granularity = ALLOWED_INTERVALS[new_idx]
                print(f"⚠️ Retrying with smaller interval ({new_granularity}h) for {start_ts} to {end_ts}...")
                success = True
                current = start_ts
                # Divide the interval into sub-intervals based on the new granularity.
                while current < end_ts:
                    sub_end = current + pd.Timedelta(hours=new_granularity)
                    if sub_end > end_ts:
                        sub_end = end_ts
                    sub_length = (sub_end - current).total_seconds() / 3600.0
                    sub_idx = self.find_starting_index(sub_length)
                    print(f"🔍 Querying sub-interval {current} to {sub_end}...")
                    # Recursively attempt to process each sub-interval.
                    if not self.attempt_interval_query(current, sub_end, sub_idx,
                                                       generate_test_query, generate_main_query, process_result):
                        success = False
                    current = sub_end
                return success

### Capture Query Counts & Detailed Logs

In [None]:
def process_intervals_for_day_generic(context: dict, log_tbl, qcollector: QueryLogCollector, days_ago: int, apply_cutoff: bool, generate_test_query, generate_main_query, process_result, label: str) -> None:
    """
    Generic helper to process missing intervals for a given day.
    
    Parameters:
      context - the run context containing current datetime and other metadata.
      log_tbl - the Spark table containing previously collected log data.
      qcollector - an instance of QueryLogCollector used to generate and execute queries.
      days_ago - number of days ago for which data is being processed.
      apply_cutoff - if True, applies a cutoff to ignore recent hours.
      generate_test_query - lambda function to generate a test query for a given interval.
      generate_main_query - lambda function to generate the main query for the interval.
      process_result - callback function to process query results.
      label - a string label for logging (e.g., "query counts" or "detailed logs").
    """
    # Set the start of the day for the target day by resetting time and subtracting days_ago.
    day_start = context["as_of_datetime"].replace(hour=0, minute=0, second=0, microsecond=0) - relativedelta(days=days_ago)
    # Get the date portion to use for filtering.
    day_date = day_start.date()
    # Identify which hours in the day are missing from the log table.
    missing_hours = QueryLogCollector.get_missing_hours_for_day(log_tbl, day_date)
    if apply_cutoff:
        # Calculate the cutoff hour by subtracting a minimum number of hours (min_hours_before_current)
        # from the current context time relative to the start of the day.
        cutoff_hour = int(((context["as_of_datetime"] - timedelta(hours=min_hours_before_current) - day_start).total_seconds()) // 3600)
        # Only consider missing hours that are before the cutoff.
        missing_hours = [h for h in missing_hours if h < cutoff_hour]
        if not missing_hours:
            print(f"✅ All eligible {label} collected for {day_date} after applying min_hours_before_current filter.")
            return
    # If there are no missing hours at all, log that the day is fully collected.
    if not missing_hours:
        print(f"✅ All {label} collected for {day_date}.")
        return
    # Group the missing hours into continuous intervals.
    intervals = QueryLogCollector.group_missing_hours(missing_hours)
    for start_hr, end_hr in intervals:
        # Calculate the actual start and end datetime for the current interval.
        interval_start = day_start + pd.Timedelta(hours=start_hr)
        interval_end = day_start + pd.Timedelta(hours=end_hr)
        # Compute the length of this interval in hours.
        block_length = (interval_end - interval_start).total_seconds() / 3600.0
        # Determine the starting index for allowed intervals based on the block length.
        idx = qcollector.find_starting_index(block_length)
        print(f"🔍 Collecting {label} for {day_date} from hour {start_hr} to {end_hr} (block length: {block_length}h)...")
        # Attempt to query for this interval; this may subdivide further if needed.
        qcollector.attempt_interval_query(interval_start, interval_end, idx, generate_test_query, generate_main_query, process_result)
        

@log_function_calls
def capture_query_counts_by_object(context: dict, object_type: str, model_objects_df) -> None:
    """
    Captures query count data for a given object type.
    
    It retrieves prior log data (if available) from the object_query_count table,
    then creates a QueryLogCollector to build KQL queries for the given object type.
    The results of the queries are processed and saved to the historical Delta table.
    For REPORT MEASURE objects, it collects distinct model object strings for mapping.
    """
    distinct_report_measures = set()
    try:
        # Attempt to access prior log data for the given model and object type.
        log_tbl = spark.table(historical_table_names["object_query_count"]).filter(
            (col("ModelUuid") == context["source_model_uuid"]) & (col("ObjectType") == object_type)
        )
    except Exception:
        log_tbl = None
        print("⚠️ Could not access object_query_count table; proceeding without prior data.")
    
    # Instantiate the QueryLogCollector with the current context.
    qcollector = QueryLogCollector(context)
    
    # Build lambda functions to generate the test and main queries.
    generate_test_query = lambda s, e: qcollector.generate_object_count_query(object_type, model_objects_df, s, e, include_summary=True)
    generate_main_query = lambda s, e: qcollector.generate_object_count_query(object_type, model_objects_df, s, e, include_summary=False)
    
    def process_interval_result(main_result, start_ts):
        # Save the result of the query to the object_query_count Delta table.
        save_dataframe_to_delta_table(
            data=main_result,
            table_name=historical_table_names["object_query_count"],
            context=context,
            AsOfDate=start_ts.date(),
            AsOfDateTime=start_ts,
            ObjectType=object_type,
        )
        # For report measures, collect distinct model object strings for mapping.
        if object_type == "REPORT MEASURE":
            objs = main_result.select("ModelObject").distinct().collect()
            for row in objs:
                if row["ModelObject"]:
                    distinct_report_measures.add(row["ModelObject"])
    
    # Process intervals for each day from 1 to max_days_ago_to_collect.
    for days_ago in range(1, max_days_ago_to_collect + 1):
        process_intervals_for_day_generic(context, log_tbl, qcollector, days_ago,
                                          apply_cutoff=(days_ago == 1),
                                          generate_test_query=generate_test_query,
                                          generate_main_query=generate_main_query,
                                          process_result=process_interval_result,
                                          label=f"{object_type} query counts")
    
    # If processing REPORT MEASURE objects, save new mappings if discovered.
    if object_type == "REPORT MEASURE":
        if distinct_report_measures:
            print(f"📝 Saving {len(distinct_report_measures)} new REPORT MEASURE mappings...")
            try:
                save_report_measure_mappings(distinct_report_measures, context)
            except Exception as e:
                print(f"❌ Failed to save REPORT MEASURE mappings: {e}")
        else:
            print("ℹ️ No new REPORT MEASUREs discovered.")


@log_function_calls
def capture_detailed_logs(context: dict) -> set:
    """
    Captures detailed DAX query logs, including performance metrics.
    
    It retrieves prior detailed logs (if available), builds dynamic queries via QueryLogCollector,
    and processes the results. The function also handles historical user mapping for principal names.
    Finally, it updates the Delta table for detailed logs and masks user names for old records if needed.
    
    Returns:
      A set of ReportIds collected from the detailed logs.
    """
    report_ids = set()
    try:
        # Attempt to access the detailed_logs table for the current model.
        log_tbl = spark.table(historical_table_names["detailed_logs"]).filter(
            (col("ModelUuid") == context["source_model_uuid"])
        )
    except Exception:
        log_tbl = None
        print("⚠️ detailed_logs table not accessible; proceeding without prior data.")
    
    # Create a QueryLogCollector instance for detailed queries.
    qcollector = QueryLogCollector(context)
    generate_test_query = lambda s, e: qcollector.generate_detailed_query(s, e, include_summary=True)
    generate_main_query = lambda s, e: qcollector.generate_detailed_query(s, e, include_summary=False)
    
    def normalize_user(user):
        # Normalize user strings for consistency in mapping (trim and lower-case).
        return user.strip().lower() if user else None
    
    historical_mapping = {}
    if collect_principal_names == 1:
        try:
            # Define historical time window for building user mapping (e.g., 30 days ago to yesterday).
            start_hist = (context["as_of_datetime"] - relativedelta(days=30)).replace(minute=0, second=0, microsecond=0)
            end_hist = (context["as_of_datetime"] - relativedelta(days=1)).replace(minute=0, second=0, microsecond=0)
            # Lambda functions to generate test and main queries for historical data.
            def gen_hist_test(s, e):
                return f"""
                    let startTime = {s.strftime("datetime(%Y-%m-%dT%H:%M:%S)")};
                    let endTime = {e.strftime("datetime(%Y-%m-%dT%H:%M:%S)")};
                    let data = SemanticModelLogs
                        | where Timestamp between (startTime .. endTime)
                        | where OperationName in ("Error", "QueryEnd")
                        | distinct XmlaSessionId, ExecutingUser;
                    data | summarize totalCount = count()
                """
            def gen_hist_main(s, e):
                return f"""
                    let startTime = {s.strftime("datetime(%Y-%m-%dT%H:%M:%S)")};
                    let endTime = {e.strftime("datetime(%Y-%m-%dT%H:%M:%S)")};
                    SemanticModelLogs
                    | where Timestamp between (startTime .. endTime)
                    | where OperationName in ("Error", "QueryEnd")
                    | distinct XmlaSessionId, ExecutingUser
                """
            hist_results = []
            # Callback to process historical data results.
            def process_hist(main_result, start_ts):
                hist_results.append(main_result.toPandas())
            total_hours = (end_hist - start_hist).total_seconds() / 3600.0
            start_idx = qcollector.find_starting_index(total_hours)
            # Attempt to collect historical mapping data over the defined period.
            success = qcollector.attempt_interval_query(start_hist, end_hist, start_idx, gen_hist_test, gen_hist_main, process_hist)
            if not success:
                print("⚠️ Failed to fully collect historical mapping data.")
            # Concatenate all historical query results into a single DataFrame.
            hist_kql_pd = pd.concat(hist_results, ignore_index=True) if hist_results else pd.DataFrame(columns=["XmlaSessionId", "ExecutingUser"])
            try:
                hist_logs_df = spark.table(historical_table_names["detailed_logs"]).select("XmlaSessionId", "ExecutingUser").distinct()
                hist_logs_pd = hist_logs_df.toPandas()
            except Exception:
                print("⚠️ detailed_logs table missing; using empty historical data.")
                hist_logs_pd = pd.DataFrame(columns=["XmlaSessionId", "ExecutingUser"])
            if not hist_kql_pd.empty and not hist_logs_pd.empty:
                # Prepare the historical mapping by merging current and historical log data.
                hist_kql = hist_kql_pd.rename(columns={"ExecutingUser": "ActualUser"})
                hist_kql["ActualUser"] = hist_kql["ActualUser"].apply(normalize_user)
                hist_logs = hist_logs_pd.rename(columns={"ExecutingUser": "MaskedUser"})
                merged = pd.merge(hist_kql, hist_logs, on="XmlaSessionId", how="inner")
                merged["NormalizedActualUser"] = merged["ActualUser"].apply(normalize_user)
                merged = merged.drop_duplicates(subset=["NormalizedActualUser"])
                historical_mapping = { row["NormalizedActualUser"]: row["MaskedUser"] for _, row in merged.iterrows() if pd.notnull(row["NormalizedActualUser"]) }
            else:
                historical_mapping = {}
            print(f"✅ Built historical mapping with {len(historical_mapping)} entries.")
        except Exception as e:
            print(f"⚠️ Historical mapping build failed: {e}")
            historical_mapping = {}
    
    # A lock to ensure thread-safe updates to the historical mapping.
    mapping_lock = threading.Lock()
    
    def process_detail_result(main_result, start_ts):
        nonlocal historical_mapping
        # Mask user names based on the collection mode.
        if collect_principal_names == 2:
            main_result = main_result.withColumn("ExecutingUser", lit("Masked"))
        elif collect_principal_names == 1:
            try:
                new_users = [normalize_user(row[0]) for row in main_result.select("ExecutingUser").distinct().collect()]
            except Exception as e:
                print(f"❌ Error retrieving distinct users: {e}")
                new_users = []
            with mapping_lock:
                for user in new_users:
                    if user and user not in historical_mapping:
                        # Generate a new masked value for unseen users.
                        historical_mapping[user] = str(uuid4())
            # Broadcast the historical mapping for efficient usage in Spark transformations.
            broadcast_map = spark.sparkContext.broadcast(historical_mapping)
            def mask_user(actual):
                # Replace the actual user with the masked version if available.
                return broadcast_map.value.get(normalize_user(actual), actual)
            mask_udf = udf(mask_user, StringType())
            main_result = main_result.withColumn("ExecutingUser", mask_udf(col("ExecutingUser")))
        # Save the detailed log result along with metadata about the interval.
        save_dataframe_to_delta_table(
            data=main_result,
            table_name=historical_table_names["detailed_logs"],
            context=context,
            AsOfDate=start_ts.date(),
            AsOfDateTime=start_ts,
        )
        try:
            # Extract distinct ReportIds from the result and update the report_ids set.
            distinct_ids = set(main_result.select("ReportId").rdd.flatMap(lambda x: x).collect())
            report_ids.update(distinct_ids)
            print(f"✅ Collected {len(distinct_ids)} ReportIds for interval starting at {start_ts}.")
        except Exception:
            print(f"❌ Failed to extract ReportIds for interval starting at {start_ts}.")
    
    # Process intervals for each day within the specified range.
    for days_ago in range(1, max_days_ago_to_collect + 1):
        process_intervals_for_day_generic(context, log_tbl, qcollector, days_ago,
                                          apply_cutoff=(days_ago == 1),
                                          generate_test_query=generate_test_query,
                                          generate_main_query=generate_main_query,
                                          process_result=process_detail_result,
                                          label="detailed logs")
    
    if mask_principal_names_after_days > 0:
        # Calculate cutoff date for masking older user names.
        cutoff_date = context["as_of_date"] - timedelta(days=mask_principal_names_after_days)
        update_query = f"""
            UPDATE {historical_table_names["detailed_logs"]}
            SET ExecutingUser = 'Masked'
            WHERE AsOfDate < '{cutoff_date}'
        """
        try:
            spark.sql(update_query)
            print(f"✅ Masked user names for records older than {mask_principal_names_after_days} days (before {cutoff_date}).")
        except Exception as e:
            print(f"❌ Failed to mask user names for historical detailed logs: {e}")
    
    print(f"✅ capture_detailed_logs complete. Total ReportIds: {len(report_ids)}")
    return report_ids


@log_function_calls
def capture_all_query_counts_and_mappings(context: dict, model_columns: pd.DataFrame, model_measures: pd.DataFrame) -> list:
    """
    Captures query counts for columns, measures, and report measures, and processes them into mappings.
    
    It performs the following steps:
      1. Validates that all required context keys are present.
      2. Processes columns and measures separately to standardize their metadata.
      3. Saves the standardized mappings to Delta tables.
      4. Captures query counts for each object type using the helper functions.
      5. Processes detailed logs to extract ReportIds and updates mappings for report measures.
    
    Returns:
      A list of ReportIds extracted from the detailed logs.
    """
    # Verify that all required keys exist in the context dictionary.
    required = {"source_model_uuid", "source_model_workspace_uuid", "source_model_name", "log_analytics_kusto_uri", "log_analytics_kusto_database"}
    missing = required - context.keys()
    if missing:
        raise KeyError(f"❌ Missing context keys: {', '.join(missing)}")
    
    report_ids = set()
    try:
        print("📊 Processing columns...")
        # Process columns to generate standardized mappings.
        processed_cols = process_semantic_model_objects(model_columns, "COLUMN")
        save_dataframe_to_delta_table(
            data=processed_cols,
            table_name=historical_table_names["object_mapping"],
            context=context,
        )
        capture_query_counts_by_object(context, "COLUMN", processed_cols)
    except Exception as e:
        print(f"❌ Columns processing failed. Error: {e}")
    
    try:
        print("📊 Processing measures...")
        # Process measures to generate standardized mappings.
        processed_measures = process_semantic_model_objects(model_measures, "MEASURE")
        save_dataframe_to_delta_table(
            data=processed_measures,
            table_name=historical_table_names["object_mapping"],
            context=context,
        )
        capture_query_counts_by_object(context, "MEASURE", processed_measures)
    except Exception as e:
        print(f"❌ Measures processing failed. Error: {e}")
    
    try:
        print("📊 Processing REPORT MEASUREs...")
        # Process report measures (without a DataFrame of model objects).
        capture_query_counts_by_object(context, "REPORT MEASURE", None)
    except Exception as e:
        print(f"❌ REPORT MEASURE processing failed. Error: {e}")
    
    try:
        # Capture detailed logs and extract ReportIds.
        detailed_ids = capture_detailed_logs(context)
        if detailed_ids:
            report_ids.update(detailed_ids)
            print(f"✅ Updated ReportIds with {len(detailed_ids)} detailed ReportIds.")
        else:
            print("⚠️ No detailed ReportIds captured.")
    except Exception as e:
        print(f"❌ Detailed log capture failed. Error: {e}")
    # Return the collected ReportIds as a list.
    return list(report_ids)

### Capturing Unused Delta Table Columns & Source Mappings

In [None]:
@log_function_calls
def capture_unused_delta_columns(context: dict) -> None:
    """
    Captures unused columns from the Delta tables by comparing lakehouse metadata with model usage.

    If lakehouse information is missing (i.e., no lakehouse name provided), the function inserts placeholder rows.
    Otherwise, it connects to the lakehouse to retrieve column metadata from Delta tables, compares it to the
    columns actually used in the model (obtained via the Table Object Model (TOM)), and saves the differences
    (unused columns) to designated Delta tables.
    """
    # Define the required context keys for retrieving lakehouse and model metadata.
    required_keys = [
        "source_lakehouse_name",
        "source_lakehouse_workspace_uuid",
        "source_lakehouse_uuid",
        "source_model_uuid",
        "source_model_workspace_uuid",
    ]
    # Ensure all required keys are present; if not, raise a KeyError.
    for key in required_keys:
        if key not in context:
            raise KeyError(f"❌ Missing required context key: '{key}'")
            
    # If lakehouse details are missing, insert placeholder rows into the target Delta tables and exit.
    if not context["source_lakehouse_name"]:
        save_dataframe_to_delta_table(
            data=spark.createDataFrame(
                [
                    {
                        "TableName": "N/A",
                        "SourceTableName": "N/A",
                        "SourceColumnName": "N/A",
                    }
                ]
            ),
            table_name=historical_table_names["unused_columns"],
            context=context,
        )
        save_dataframe_to_delta_table(
            data=spark.createDataFrame(
                [
                    {
                        "TableName": "N/A",
                        "ColumnName": "N/A",
                        "SourceTableName": "N/A",
                        "SourceColumnName": "N/A",
                    }
                ]
            ),
            table_name=historical_table_names["source_mapping"],
            context=context,
        )
        return

    tom_tables_info = []
    # Connect to the semantic model using the TOM API to retrieve table metadata.
    with labs.tom.connect_semantic_model(
        dataset=context["source_model_uuid"],
        readonly=True,
        workspace=context["source_model_workspace_uuid"],
    ) as tom:
        # Iterate over tables in the semantic model.
        for tbl in tom.model.Tables:
            # Get the first partition (if any) to extract source information.
            partition = next(iter(tbl.Partitions), None)
            if partition and partition.Source:
                tom_tables_info.append(
                    {
                        "tom_table": tbl,
                        "schema_name": getattr(partition.Source, "SchemaName", None),
                        "source_table_name": getattr(partition.Source, "EntityName", None),
                    }
                )

    def get_lakehouse_columns():
        """
        Retrieves columns from lakehouse Delta tables by manually reading data from known storage paths.
        
        For each table from the TOM metadata, constructs the path and loads the Delta table,
        then extracts the column names.
        """
        data = []
        # Loop through each table metadata record obtained from TOM.
        for item in tom_tables_info:
            schema = item["schema_name"]
            entity = item["source_table_name"]
            if entity:
                # Build a folder path using schema and table name; schema is appended if present.
                schema_part = f"{schema}/" if schema else ""
                path = (
                    f"abfss://{context['source_lakehouse_workspace_uuid']}"
                    f"@{abfss_base_path}/"
                    f"{context['source_lakehouse_uuid']}/Tables/{schema_part}{entity}/"
                )
                try:
                    # Load the Delta table from the specified path.
                    df = spark.read.format("delta").load(path)
                    # For each column in the Delta table schema, append its name along with the table.
                    for col_name in df.schema.fieldNames():
                        data.append({"Table Name": entity, "Column Name": col_name})
                except Exception as ex:
                    print(f"⚠️ Unable to read from {path}: {ex}")
        return spark.createDataFrame(data)

    # Retrieve all columns from the lakehouse using the defined helper function.
    all_cols_df = get_lakehouse_columns()

    # Ensure the result is a Spark DataFrame (convert if necessary).
    if not isinstance(all_cols_df, pyspark.sql.DataFrame):
        all_cols_df = spark.createDataFrame(all_cols_df)

    # Group the retrieved columns by table name and collect them into sets.
    grouped_df = (
        all_cols_df.groupBy("Table Name")
        .agg(collect_set("Column Name").alias("columns"))
        .collect()
    )
    # Create a dictionary mapping each table to its set of columns.
    table_columns = {row["Table Name"]: set(row["columns"]) for row in grouped_df}

    remaining_columns = []  # To store columns present in the lakehouse but unused in the model.
    source_mapping = []     # To store mapping between model columns and source columns as defined in TOM.

    for info in tom_tables_info:
        tbl = info["tom_table"]
        src_table = info["source_table_name"]
        if not src_table:
            continue
        # Get the set of columns available in the lakehouse for this source table.
        delta_cols = table_columns.get(src_table, set())
        # Get the set of columns used in the model from TOM metadata.
        used_cols = {col.SourceColumn for col in tbl.Columns if hasattr(col, "SourceColumn")}
        # Build the source mapping for each column that has a SourceColumn attribute.
        for col in tbl.Columns:
            if hasattr(col, "SourceColumn"):
                source_mapping.append(
                    {
                        "TableName": tbl.Name,
                        "ColumnName": col.Name,
                        "SourceTableName": src_table,
                        "SourceColumnName": col.SourceColumn,
                    }
                )
        # Identify columns in the lakehouse that are not used in the model.
        for unused in delta_cols - used_cols:
            remaining_columns.append(
                {
                    "TableName": tbl.Name,
                    "SourceTableName": src_table,
                    "SourceColumnName": unused,
                }
            )
    # Convert the lists of unused columns and source mappings to Spark DataFrames.
    unused_df = spark.createDataFrame(remaining_columns)
    mapping_df = spark.createDataFrame(source_mapping)
    # Save the unused columns data to the Delta table for unused columns.
    save_dataframe_to_delta_table(
        data=unused_df,
        table_name=historical_table_names["unused_columns"],
        context=context,
    )
    # Save the source mapping data to the Delta table for source mappings.
    save_dataframe_to_delta_table(
        data=mapping_df,
        table_name=historical_table_names["source_mapping"],
        context=context,
    )

### Capturing and Processing Reports

In [None]:
@log_function_calls
def get_reports(context: dict, all_workspaces: pd.DataFrame, report_ids_to_keep: list) -> None:
    """
    Retrieves report information from each workspace and filters to retain only the specified ReportIds.
    
    For each workspace:
      - Calls the Fabric API to list reports.
      - Selects and renames columns for consistency.
      - Adds workspace identifiers.
      - Filters out reports not in the report_ids_to_keep list.
      - For any missing ReportIds, creates placeholder rows with default "Unknown" values.
    The final combined DataFrame is saved to the Delta table for reports.
    """
    reports_list = []  # Initialize a list to store DataFrames of reports from each workspace.
    try:
        # Iterate over each workspace provided in the all_workspaces DataFrame.
        for _, workspace in all_workspaces.iterrows():
            ws_id = workspace["Id"]
            ws_name = workspace["Name"]
            try:
                # Retrieve the list of reports for the current workspace using the Fabric API.
                reports_df = fabric.list_reports(workspace=ws_id)
            except Exception as e:
                # If report listing fails for a workspace, log the error and skip to the next workspace.
                print(f'❌ Failed to list reports for workspace "{ws_name}" (ID: {ws_id}). Error: {e}')
                continue
            
            # Select only the relevant columns and rename them for consistency.
            reports_df = reports_df[["Id", "Name", "Web Url"]].rename(
                columns={"Id": "ReportId", "Name": "ReportName", "Web Url": "WebUrl"}
            )
            # Add workspace-specific identifiers to the report DataFrame.
            reports_df["WorkspaceId"] = ws_id
            reports_df["WorkspaceName"] = ws_name
            # Filter the reports to keep only those whose ReportId is in the report_ids_to_keep list.
            reports_df = reports_df[reports_df["ReportId"].isin(report_ids_to_keep)]
            # Append the filtered DataFrame to the list.
            reports_list.append(reports_df)
        
        # Combine all report DataFrames into one; if none were found, create an empty DataFrame with the required columns.
        combined_reports = (
            pd.concat(reports_list, ignore_index=True)
            if reports_list
            else pd.DataFrame(
                columns=[
                    "ReportId",
                    "ReportName",
                    "WebUrl",
                    "WorkspaceId",
                    "WorkspaceName",
                ]
            )
        )
        # Determine which ReportIds from the desired list are missing in the combined reports.
        missing_ids = set(report_ids_to_keep) - set(combined_reports["ReportId"])
        if missing_ids:
            # For any missing ReportIds, create placeholder rows with default "Unknown" values.
            missing_df = pd.DataFrame(
                [
                    {
                        "ReportId": rid,
                        "ReportName": "Unknown",
                        "WebUrl": "Unknown",
                        "WorkspaceId": "Unknown",
                        "WorkspaceName": "Unknown",
                    }
                    for rid in missing_ids
                ]
            )
            # Append the placeholder rows to the combined DataFrame.
            combined_reports = pd.concat(
                [combined_reports, missing_df], ignore_index=True
            )
        # Save the final combined reports DataFrame to the Delta table for source reports.
        save_dataframe_to_delta_table(
            data=combined_reports,
            table_name=historical_table_names["source_reports"],
            context=context,
        )
        print(f"✅ Retrieved and saved {len(combined_reports)} reports.")
    except Exception as e:
        print(f"❌ get_reports encountered an error: {e}")
        raise

### Cold Cache Helpers: Log Table, Model Refresh, Cache Clear, and Timing Capture

In [None]:
@log_function_calls
def fetch_log_table(context: dict, table_name: str) -> pyspark.sql.DataFrame:
    """
    Attempts to fetch a log table for today's QueryEnd events for the current model.

    Returns:
      A Spark DataFrame filtered by ModelUuid, AsOfDate, and EventClass,
      or None if the table does not exist.
    """
    try:
        # Read the table from Spark using its name.
        raw_tbl = spark.read.table(table_name)
        # Apply filters: match the current model, today's date, and ensure the event class is "QueryEnd".
        filters = (
            (col("ModelUuid") == context["source_model_uuid"]) &
            (col("AsOfDate") == context["as_of_date"]) &
            (col("EventClass") == "QueryEnd")
        )
        return raw_tbl.filter(filters)
    except Exception:
        # Log informational message if the table is not found; it may be created later.
        print(f"ℹ️ Log table `{table_name}` does not exist. It will be created if needed.")
        return None


@log_function_calls
def wait_for_model_creation() -> None:
    """
    Polls the target workspace until the cloned model is created.

    Continues checking every 5 seconds until the cloned model appears in the dataset list.
    """
    # Continuously check if the cloned model is present in the dataset list from the target workspace.
    while (
        cloned_model_name
        not in fabric.list_datasets(workspace=cold_cache_target_workspace_name, mode="rest")["Dataset Name"].to_list()
    ):
        print("⌛ Waiting for cloned model creation...")
        time.sleep(5)


@log_function_calls
def refresh_dataset(model_name: str, refresh_type: str) -> None:
    """
    Initiates a dataset refresh using the specified refresh type (e.g., "clearValues" or "full").

    Waits until the refresh operation completes with a status in valid_refresh_statuses.
    """
    attempts = 0
    # Start the refresh process and obtain a refresh status identifier.
    refresh_status = fabric.refresh_dataset(model_name, refresh_type=refresh_type)
    # Poll until the refresh status indicates completion or failure.
    while (
        fabric.get_refresh_execution_details(model_name, refresh_status).status not in valid_refresh_statuses
    ):
        attempts += 1
        if attempts >= max_attempts:
            raise Exception(f"❌ Refresh failed after {attempts} attempts.")
        # Wait briefly before the next check.
        time.sleep(3)


@log_function_calls
def clear_cache(model_name: str) -> None:
    """
    Clears the VertiPaq cache for the specified model.

    It calls a helper function to clear the cache and then verifies the operation
    by executing a trivial DAX query. The process is retried until successful or
    until the maximum number of attempts is reached.
    """
    attempts = 0
    while True:
        try:
            # Attempt to clear the cache via a helper function.
            labs.clear_cache(model_name)
            # Verify the cache clear by executing a simple DAX query.
            fabric.evaluate_dax(model_name, "EVALUATE {1}")
            print(f"✅ Cache cleared for model `{model_name}`.")
            break
        except Exception as e:
            attempts += 1
            print(f"⚠️ Cache clear attempt failed: {e}")
            if attempts >= max_attempts:
                raise Exception("❌ Failed to clear VertiPaq cache.")
            # Refresh the TOM cache for the target workspace to ensure consistency.
            fabric.refresh_tom_cache(cold_cache_target_workspace_name)
            time.sleep(5)


def capture_cold_cache_timings(column_name: str, trace) -> None:
    """
    Executes a DAX query for a specific column to measure cold cache performance.

    After running the query, it checks the trace logs for a QueryEnd event that
    references the given column name. Raises an exception if the expected event
    is not found after a specified number of attempts.
    """
    # Construct a DAX expression to query a sample of the column's values.
    dax_expr = f"EVALUATE TOPN(1, VALUES({column_name}))"
    try:
        # Execute the DAX query on the cloned model.
        fabric.evaluate_dax(cloned_model_name, dax_expr)
    except Exception as e:
        print(f"❌ DAX evaluation error for {column_name}: {e}")
        raise Exception(f"Failed to evaluate DAX for {column_name}: {e}")
    attempts = 0
    while attempts < max_attempts:
        try:
            # Retrieve trace logs from the trace object.
            trace_logs = trace.get_trace_logs()
            if trace_logs is None:
                raise Exception(f"Trace logs are None for column {column_name}")
            # Look for a QueryEnd event that includes the column name in its Text Data.
            matching_logs = trace_logs[
                (trace_logs["Event Class"] == "QueryEnd") &
                (trace_logs["Text Data"].str.contains(re.escape(column_name), na=False))
            ]
            if not matching_logs.empty:
                break  # Expected trace log found; exit loop.
        except Exception as e:
            print(f"❌ Error reading trace logs for {column_name}: {e}")
            raise Exception(f"Failed to access trace logs for {column_name}: {e}")
        attempts += 1
        if attempts >= max_attempts:
            raise Exception(f"❌ Failed after {attempts} attempts for column {column_name}")
        time.sleep(3)

### Capturing Cold Cache Performance Metrics

In [None]:
@log_function_calls
def capture_cold_cache_performance(context: dict, model_columns: pd.DataFrame) -> set:
    """
    Measures cold cache performance for eligible columns by deploying a cloned model and executing parallel DAX queries.

    Steps:
      1. Set up and deploy a cloned version of the model.
      2. Refresh and clear the cache of the cloned model.
      3. Create a trace to capture QueryEnd events and measure performance.
      4. Execute DAX queries in parallel for each eligible column.
      5. Save the trace logs with performance metrics.

    Returns:
      A set of column identifiers (formatted strings) that were successfully processed.
    """
    global cloned_model_name, valid_refresh_statuses, max_attempts

    # Define the cloned model's name based on the source model.
    cloned_model_name = f"{context['source_model_name']} - Semantic Model Audit"
    # Valid statuses indicating the refresh operation has completed.
    valid_refresh_statuses = ["Completed", "Failed"]
    # Maximum number of attempts for refresh and cache clearing.
    max_attempts = 120

    # Try to get existing cold cache log data, if available.
    log_tbl = fetch_log_table(context, historical_table_names["cold_cache_measurements"])

    # Filter out columns not eligible for cold cache measurement (e.g., those starting with "RowNumber-").
    eligible_df = model_columns[~model_columns["ColumnName"].str.startswith("RowNumber-")]
    # Format each eligible column as a string representation: 'TableName'[ColumnName]
    eligible_columns = [
        f"'{row['TableName']}'[{row['ColumnName']}]"
        for _, row in eligible_df.iterrows()
    ]

    if log_tbl is not None:
        # Group the existing log data by column name and count entries.
        counts_df = log_tbl.groupBy("ColumnName").count()
        # Identify columns that have already reached the maximum queries per day.
        columns_to_skip = {
            row["ColumnName"]
            for row in counts_df.filter(col("count") >= max_queries_daily)
            .select("ColumnName")
            .collect()
        }
        # Exclude columns that should be skipped.
        filtered_columns = [col for col in eligible_columns if col not in columns_to_skip]
        num_skipped = len(columns_to_skip)
    else:
        filtered_columns = eligible_columns
        num_skipped = 0

    num_query = len(filtered_columns)
    print(f"📊 {num_query} columns to query; {num_skipped} columns skipped.")

    # If cold cache measurements should be collected and there are columns to query.
    if collect_cold_cache_measurements and num_query > 0:
        try:
            # Deploy a cloned version of the model for cold cache testing.
            labs.deploy_semantic_model(
                source_dataset=context["source_model_name"],
                source_workspace=context["source_model_workspace_name"],
                target_dataset=cloned_model_name,
                target_workspace=cold_cache_target_workspace_name,
                refresh_target_dataset=False,
                overwrite=True,
            )
            # Refresh the TOM cache for the target workspace.
            fabric.refresh_tom_cache(cold_cache_target_workspace_name)
            time.sleep(30)  # Wait for the cache refresh to settle.
            wait_for_model_creation()  # Poll until the cloned model is created.
            # Refresh the cloned model's dataset with a "clearValues" and then "full" refresh.
            refresh_dataset(cloned_model_name, "clearValues")
            refresh_dataset(cloned_model_name, "full")
            # Clear the VertiPaq cache for the cloned model.
            clear_cache(cloned_model_name)
            time.sleep(5)  # Short delay after clearing the cache.

            # Set up a trace connection to capture QueryEnd events for performance metrics.
            trace_conn = fabric.create_trace_connection(
                dataset=cloned_model_name, workspace=cold_cache_target_workspace_name
            )
            trace_conn.drop_traces()  # Clear any existing traces.
            trace_name = f"Simple DAX Trace {uuid4()}"
            event_schema = {
                "QueryEnd": ["EventClass", "TextData", "Duration", "CpuTime", "Success"]
            }
            # Create a trace within a context manager to ensure proper resource management.
            with fabric.create_trace_connection(
                dataset=cloned_model_name, workspace=cold_cache_target_workspace_name
            ) as trace_conn:
                with trace_conn.create_trace(event_schema=event_schema, name=trace_name) as trace:
                    trace.start()
                    # Wait until the trace has started.
                    while not trace.is_started:
                        time.sleep(2)
                    print("🔄 Querying columns in parallel...")
                    total_cols = len(filtered_columns)
                    if total_cols == 0:
                        print("ℹ️ No columns to query after filtering.")
                        return set()
                    # Set up progress tracking.
                    progress_interval = math.ceil(total_cols / 10)
                    next_progress = progress_interval
                    completed = 0
                    successful = set()
                    failed = set()
                    # Execute queries in parallel using a thread pool.
                    with ThreadPoolExecutor(max_workers=max_workers) as executor:
                        future_to_col = {
                            executor.submit(capture_cold_cache_timings, col, trace): col
                            for col in filtered_columns
                        }
                        for future in as_completed(future_to_col):
                            col_name = future_to_col[future]
                            try:
                                # This will raise an exception if the query for this column fails.
                                future.result()
                                successful.add(col_name)
                            except Exception as e:
                                print(f"❌ Error processing {col_name}: {e}")
                                failed.add(col_name)
                                continue
                            completed += 1
                            # Print progress updates at regular intervals.
                            if completed >= next_progress:
                                print(f"✅ {completed / total_cols * 100:.0f}% of columns completed.")
                                next_progress += progress_interval
                    try:
                        # Stop the trace and retrieve the captured trace logs.
                        trace_logs = trace.stop()
                        if trace_logs is not None and not trace_logs.empty:
                            # Extract the column name from the "Text Data" field using a regex.
                            trace_logs["ColumnName"] = trace_logs["Text Data"].str.extract(r"VALUES\s*\(\s*(.+?)\s*\)\s*\)")
                            # Save the trace logs to the Delta table for cold cache measurements.
                            save_dataframe_to_delta_table(
                                data=trace_logs,
                                table_name=historical_table_names["cold_cache_measurements"],
                                context=context,
                                QueryUuid=str(uuid4()),
                            )
                    except Exception as e:
                        print(f"❌ Failed to process trace logs: {e}")
                    if failed:
                        print(f"⚠️ {len(failed)} columns failed: {', '.join(failed)}")
                    else:
                        print("✅ All columns processed successfully.")
                    print(f"✅ Cold cache performance capture complete. {len(successful)} columns queried successfully.")
                    return successful
        except Exception as e:
            print(f"❌ Error during cold cache performance capture: {e}")
            return set()
    else:
        print("ℹ️ No columns to query after filtering; inserting placeholder.")
        try:
            # If no columns are eligible, insert a placeholder record to maintain table structure.
            save_dataframe_to_delta_table(
                data=pd.DataFrame({
                    "EventClass": ["N/A"],
                    "TextData": ["N/A"],
                    "Duration": [0],
                    "CpuTime": [0],
                    "Success": ["N/A"],
                    "ColumnName": ["N/A"],
                }),
                table_name=historical_table_names["cold_cache_measurements"],
                context=context,
                QueryUuid=str(uuid4()),
            )
        except Exception as e:
            print(f"❌ Failed to insert placeholder for cold cache measurements: {e}")
        return set()

### Capturing Resident Column Statistics

In [None]:
@log_function_calls
def capture_resident_statistics(context: dict, queried_columns: set) -> None:
    """
    Captures resident statistics (e.g., whether columns are loaded in memory, sizes) for model columns.
    
    It compares current model columns (using the Fabric API) with historical resident statistics,
    and saves only new records for columns that have not been recorded yet.
    """
    
    def format_column(row: dict) -> str:
        # Standardize the column identifier by combining the table and column names.
        return f"'{row['TableName']}'[{row['ColumnName']}]"
    
    try:
        # Read previously captured resident statistics for the current date from the Delta table.
        existing_stats = (
            spark.read.table(historical_table_names["resident_statistics"])
            .filter(
                (col("ModelUuid") == context["source_model_uuid"]) &
                (col("AsOfDate") == context["as_of_date"])
            )
            .select("TableName", "ColumnName")
            .collect()
        )
        # Create a set of standardized identifiers for the existing resident statistics.
        existing = {format_column(row.asDict()) for row in existing_stats}
    except Exception:
        print("⚠️ Could not read existing resident statistics; proceeding without.")
        existing = set()
    
    # Determine which model to query: if cold cache was measured, use the cloned model; otherwise, use the source model.
    resident_model = (
        cloned_model_name
        if collect_cold_cache_measurements
        else context["source_model_name"]
    )
    resident_workspace = (
        cold_cache_target_workspace_name
        if collect_cold_cache_measurements
        else context["source_model_workspace_name"]
    )
    
    # Retrieve the current list of model columns using the Fabric API.
    model_columns_resident = fabric.list_columns(
        dataset=resident_model,
        extended=True,
        workspace=resident_workspace,
    )
    # Remove spaces from column names to ensure consistency in identifiers.
    model_columns_resident.columns = model_columns_resident.columns.str.replace(" ", "", regex=True)
    # Build a set of standardized identifiers for the current model columns.
    model_set = {format_column(row) for _, row in model_columns_resident.iterrows()}
    
    # Identify columns that are new (i.e., not present in the historical resident statistics).
    to_capture = model_set - existing
    if collect_cold_cache_measurements:
        # Optionally restrict to only columns that were previously queried for cold cache metrics.
        to_capture = to_capture.intersection(queried_columns)
    to_capture = list(to_capture)
    
    if to_capture:
        # Filter the current model columns DataFrame to include only those columns that are new.
        filtered = [
            row
            for _, row in model_columns_resident.iterrows()
            if format_column(row) in to_capture
        ]
        filtered_df = pd.DataFrame(filtered)
        print(f"📈 Capturing resident statistics for {len(filtered_df)} new columns.")
        # Save the new resident statistics to the designated Delta table.
        save_dataframe_to_delta_table(
            data=filtered_df,
            table_name=historical_table_names["resident_statistics"],
            context=context,
        )
    else:
        print("ℹ️ No new resident statistics to capture for this run.")

### Workspace Monitoring Information

In [None]:
@log_function_calls
def get_workspace_monitoring_info(workspace: str) -> tuple[str, str]:
    """
    Retrieves the Query Service URI and KQL Database Id for the Monitoring KQL database in the given workspace.
    
    This function queries the list of KQL databases in the workspace using the Fabric API (via labs.list_kql_databases).
    It then filters the result to find the database named "Monitoring KQL database". If such a database is not found,
    it raises a ValueError. Otherwise, it extracts and returns the Query Service URI and the KQL Database Id as a tuple.
    
    Returns:
      A tuple (kusto_uri, kusto_db_guid) where:
        - kusto_uri: The URI for querying the KQL database.
        - kusto_db_guid: The unique identifier (GUID) for the KQL database.
    
    Raises:
      ValueError: If no KQL databases or the specific "Monitoring KQL database" is found in the workspace.
    """
    # Retrieve a DataFrame containing all KQL databases for the given workspace.
    df = labs.list_kql_databases(workspace=workspace)
    # Check if the DataFrame is empty; if so, no KQL databases exist in the workspace.
    if df.empty:
        raise ValueError(f"❌ No KQL databases found in workspace `{workspace}`.")
    # Filter the DataFrame to find the row where the KQL Database Name matches "Monitoring KQL database".
    df_monitor = df[df["KQL Database Name"] == "Monitoring KQL database"]
    # If no matching database is found, raise an error.
    if df_monitor.empty:
        raise ValueError(
            f"❌ Monitoring KQL database not found in workspace `{workspace}`."
        )
    # Extract the Query Service URI from the first (and expected only) matching row.
    kusto_uri = df_monitor.iloc[0]["Query Service URI"]
    # Extract the KQL Database Id (GUID) from the same row.
    kusto_db_uuid = df_monitor.iloc[0]["KQL Database Id"]
    # Return the extracted URI and Database Id as a tuple.
    return kusto_uri, kusto_db_uuid


### Main Orchestration: Collecting Statistics for Each Semantic Model

In [None]:
@log_function_calls
def collect_model_statistics(models: list) -> None:
    """
    Main orchestration function that processes each semantic model to capture various statistics.
    
    For each model in the provided list, the function performs the following steps:
      1. Clean up incomplete historical data if configured (by dropping tables or removing incomplete runs).
      2. Build a context dictionary containing all necessary model and workspace details.
      3. Record the start of the run in the run_history table.
      4. Retrieve workspace monitoring information (using lakehouse details if available).
      5. Capture model objects (columns and measures) and save them for historical tracking.
      6. Capture measure dependencies via a DAX query.
      7. Capture query counts for various model objects and update mappings accordingly.
      8. Retrieve detailed query logs and extract ReportIds.
      9. Capture unused Delta table columns and cold cache performance.
      10. Capture resident column statistics (e.g., column residency in memory).
      11. Record the run completion status (marking the run as completed or failed).
    
    If any critical step fails for a model, the function marks the run as failed and proceeds with the next model.
    """
    # Step 1: Clean up historical data if configured.
    if force_delete_historical_tables:
        print("⚠️ Force-deleting historical tables. All data will be lost.")
        drop_historical_tables()
    elif force_delete_incomplete_runs:
        print("ℹ️ Removing records for incomplete runs.")
        cleanup_incomplete_runs()

    # Step 2: Retrieve all workspaces available in the system.
    all_workspaces = fabric.list_workspaces()

    # Process each model from the provided models list.
    for model in models:
        now = datetime.now()  # Capture current timestamp for this run.
        # Step 2: Build the context dictionary with required metadata.
        context = {
            "run_uuid": str(uuid4()),
            "as_of_datetime": now,
            "as_of_date": now.date(),
            "source_model_workspace_name": model["model_workspace_name"],
            "source_model_workspace_uuid": fabric.resolve_workspace_id(model["model_workspace_name"]),
            "source_model_name": model["model_name"],
            "source_model_uuid": fabric.resolve_dataset_id(model["model_name"], model["model_workspace_name"]),
        }
        # Include lakehouse details in the context if they are provided.
        if "lakehouse_name" in model and "lakehouse_workspace_name" in model:
            context.update({
                "source_lakehouse_name": model["lakehouse_name"],
                "source_lakehouse_workspace_name": model["lakehouse_workspace_name"],
                "source_lakehouse_uuid": labs.resolve_lakehouse_id(model["lakehouse_name"], model["lakehouse_workspace_name"]),
                "source_lakehouse_workspace_uuid": fabric.resolve_workspace_id(model["lakehouse_workspace_name"]),
            })
        else:
            # Use empty strings if lakehouse details are not provided.
            context.update({
                "source_lakehouse_name": "",
                "source_lakehouse_workspace_name": "",
                "source_lakehouse_uuid": "",
                "source_lakehouse_workspace_uuid": "",
            })
        try:
            print(f"📁 Processing model `{model['model_name']}`")
            # Step 3: Record the start of the run.
            record_run_start(context)

            # Step 4: Determine workspace monitoring info.
            if model["log_analytics_kusto_uri"] or model["log_analytics_kusto_database_uuid"]:
                context["log_analytics_kusto_uri"] = model["log_analytics_kusto_uri"]
                context["log_analytics_kusto_database"] = model["log_analytics_kusto_database_uuid"]
            elif context["source_model_workspace_uuid"]:
                (context["log_analytics_kusto_uri"],
                 context["log_analytics_kusto_database"]) = get_workspace_monitoring_info(context["source_model_workspace_uuid"])
            else:
                context["log_analytics_kusto_uri"] = ""
                context["log_analytics_kusto_database"] = ""

            report_ids = set()  # To accumulate ReportIds from detailed logs.

            try:
                # Step 5: Capture model objects (columns and measures).
                model_columns, model_measures = capture_semantic_model_objects(context)
                # Step 6: Capture measure dependencies.
                capture_semantic_model_dependencies(context, model_measures)
                # Step 7: Capture query counts and update object mappings.
                report_ids = set(capture_all_query_counts_and_mappings(context, model_columns, model_measures))
                if report_ids:
                    # Step 8: Retrieve and save reports using the collected ReportIds.
                    get_reports(context, all_workspaces, list(report_ids))
                else:
                    print("ℹ️ No ReportIds found to process.")
            except Exception as critical_err:
                print(f"❌ Critical step failed for model `{model['model_name']}`: {critical_err}")
                try:
                    record_run_completion(context, "failed")
                    print(f"🔴 Run UUID: {context['run_uuid']} marked as failed.")
                except Exception as update_err:
                    print(f"❌ Failed to mark run UUID: {context['run_uuid']} as failed. Error: {update_err}")
                continue  # Skip to next model if a critical step fails.

            try:
                # Step 9: Capture unused Delta table columns.
                capture_unused_delta_columns(context)
            except Exception as e:
                print(f"⚠️ Failed to capture unused Delta columns for model `{model['model_name']}`: {e}")

            try:
                # Step 9 (continued): Capture cold cache performance metrics.
                queried_cols = capture_cold_cache_performance(context, model_columns)
            except Exception as e:
                print(f"⚠️ Cold cache performance capture failed for model `{model['model_name']}`: {e}")
                queried_cols = set()

            try:
                # Step 10: Capture resident statistics for columns.
                capture_resident_statistics(context, queried_cols)
            except Exception as e:
                print(f"⚠️ Resident statistics capture failed for model `{model['model_name']}`: {e}")

            try:
                # Step 11: Mark the run as completed.
                record_run_completion(context, "completed")
                print(f"✅ Run UUID: {context['run_uuid']} completed successfully.")
            except Exception as e:
                print(f"❌ Failed to mark run UUID: {context['run_uuid']} as completed. Error: {e}")
        except Exception as e:
            # Log any unexpected error during processing and move to the next model.
            print(f"❌ Unexpected error processing model `{model['model_name']}`: {e}")
            continue  # Continue with next model on error

### Execute the Statistics Collection for All Models

In [None]:
# Execute the main function to process all models defined in the models list.
collect_model_statistics(models)

### Generate Star Schema
### Helper Function for Star Schema: Generate Table Key
Used throughout the star schema creation SQL to produce unique keys.

In [None]:
def generate_table_key(*columns) -> str:
    """
    Generates a SQL expression to produce a unique key from the concatenated values of the given columns.
    
    It uses IFNULL to replace NULLs, CONCAT to join values, MD5 for hashing,
    and CONV to convert the hash to a BIGINT.
    
    Returns:
      A string containing the SQL expression for the unique key.
    """
    # Create IFNULL expressions for each column to avoid NULL issues.
    ifnull_parts = [f"IFNULL({col}, '')" for col in columns]
    # Build the SQL expression.
    sql_expr = f"""
        CAST(CONV(
            RIGHT(MD5(CONCAT({", ".join(ifnull_parts)})), 16),
            16,
            -10
        ) AS BIGINT)
    """
    return sql_expr.strip()

### Create ```DIM_ModelObject```
Collects the most recent column, measure, and unused column definitions.

In [None]:
query_result = spark.sql(f"""
    -- Get the latest report measures
    WITH latest_report_measures AS (
        SELECT
            mapping.TableName,
            mapping.ObjectName,
            mapping.ObjectType,
            mapping.ModelObject AS Expression,
            '' AS Description,
            NULL AS ModifiedDate,
            'True' AS DeletedFromModelFlag,
            query_count.RunUuid,
            query_count.ModelUuid,
            query_count.AsOfDate,
            query_count.AsOfDateTime
        FROM {historical_table_names["object_query_count"]} AS query_count
        JOIN (
            SELECT
                ModelUuid,
                ModelObject,
                MAX(AsOfDateTime) AS MaxAsOfDateTime
            FROM {historical_table_names["object_query_count"]}
            GROUP BY ALL
        ) AS latest ON
            latest.ModelObject = query_count.ModelObject
            AND latest.ModelUuid = query_count.ModelUuid
            AND latest.MaxAsOfDateTime = query_count.AsOfDateTime
        LEFT JOIN {historical_table_names["object_mapping"]} AS mapping ON
            mapping.RunUuid = query_count.RunUuid
            AND mapping.ModelUuid = query_count.ModelUuid
            AND mapping.ObjectType = query_count.ObjectType
            AND mapping.ModelObject = query_count.ModelObject
        WHERE query_count.ObjectType = 'REPORT MEASURE'
    ),

    -- Get the latest model columns
    latest_model_columns AS (
        SELECT
            mapping.TableName,
            mapping.ObjectName,
            mapping.ObjectType,
            '' AS Expression,
            model_column.Description,
            CAST(ModifiedTime AS DATE) AS ModifiedDate,
            CASE 
                WHEN MAX(model_column.AsOfDate) OVER() = model_column.AsOfDate THEN 'False'
                ELSE 'True'
            END AS DeletedFromModelFlag,
            model_column.RunUuid,
            model_column.ModelUuid,
            model_column.AsOfDate,
            model_column.AsOfDateTime
        FROM {historical_table_names["model_columns"]} AS model_column
        JOIN (
            SELECT
                ModelUuid,
                TableName,
                ColumnName,
                MAX(AsOfDateTime) AS MaxAsOfDateTime
            FROM {historical_table_names["model_columns"]}
            GROUP BY ALL
        ) AS latest ON
            latest.ModelUuid = model_column.ModelUuid
            AND latest.TableName = model_column.TableName
            AND latest.ColumnName = model_column.ColumnName
            AND latest.MaxAsOfDateTime = model_column.AsOfDateTime
        LEFT JOIN {historical_table_names["object_mapping"]} AS mapping ON
            mapping.RunUuid = model_column.RunUuid
            AND mapping.ModelUuid = model_column.ModelUuid
            AND mapping.ObjectType = 'COLUMN'
            AND mapping.TableName = model_column.TableName
            AND mapping.ObjectName = model_column.ColumnName
    ),

    -- Get the latest model measures
    latest_model_measures AS (
        SELECT
            mapping.TableName,
            mapping.ObjectName,
            mapping.ObjectType,
            model_measure.MeasureExpression AS Expression,
            model_measure.MeasureDescription AS Description,
            NULL AS ModifiedDate,
            CASE 
                WHEN MAX(model_measure.AsOfDate) OVER() = model_measure.AsOfDate THEN 'False'
                ELSE 'True'
            END AS DeletedFromModelFlag,
            model_measure.RunUuid,
            model_measure.ModelUuid,
            model_measure.AsOfDate,
            model_measure.AsOfDateTime
        FROM {historical_table_names["model_measures"]} AS model_measure
        JOIN (
            SELECT
                ModelUuid,
                TableName,
                MeasureName,
                MAX(AsOfDateTime) AS MaxAsOfDateTime
            FROM {historical_table_names["model_measures"]}
            GROUP BY ALL
        ) AS latest ON
            latest.ModelUuid = model_measure.ModelUuid
            AND latest.TableName = model_measure.TableName
            AND latest.MeasureName = model_measure.MeasureName
            AND latest.MaxAsOfDateTime = model_measure.AsOfDateTime
        LEFT JOIN {historical_table_names["object_mapping"]} AS mapping ON
            mapping.ModelUuid = model_measure.ModelUuid
            AND mapping.RunUuid = model_measure.RunUuid
            AND mapping.ObjectType = 'MEASURE'
            AND mapping.TableName = model_measure.TableName
            AND mapping.ObjectName = model_measure.MeasureName
    ),

    -- Get unused columns
    latest_unused_columns AS (
        SELECT
            unused_column.TableName,
            IFNULL(model_column_with_mapping.ObjectName, unused_column.SourceColumnName) AS ObjectName,
            'COLUMN' AS ObjectType,
            '' AS Expression,
            model_column_with_mapping.Description,
            model_column_with_mapping.ModifiedDate,
            'True' AS DeletedFromModelFlag,
            unused_column.RunUuid,
            unused_column.ModelUuid,
            unused_column.AsOfDate,
            unused_column.AsOfDateTime
        FROM {historical_table_names["unused_columns"]} AS unused_column
        LEFT JOIN (
            SELECT
                ModelUuid,
                SourceTableName,
                SourceColumnName,
                MAX(AsOfDateTime) AS MaxAsOfDateTime
            FROM {historical_table_names["unused_columns"]}
            GROUP BY ALL
        ) AS latest ON
            latest.ModelUuid = unused_column.ModelUuid
            AND latest.SourceTableName = unused_column.SourceTableName
            AND latest.SourceColumnName = unused_column.SourceColumnName
            AND latest.MaxAsOfDateTime = unused_column.AsOfDateTime
        LEFT JOIN (
            SELECT
                latest_model_columns.ModelUuid,
                latest_model_columns.TableName,
                latest_model_columns.ObjectName,
                latest_model_columns.Description,
                latest_model_columns.ModifiedDate,
                source_mapping.SourceTableName,
                source_mapping.SourceColumnName,
                source_mapping.RunUuid
            FROM latest_model_columns
            LEFT JOIN {historical_table_names["source_mapping"]} AS source_mapping ON
                source_mapping.ModelUuid = latest_model_columns.ModelUuid
                AND source_mapping.RunUuid = latest_model_columns.RunUuid
                AND source_mapping.TableName = latest_model_columns.TableName
                AND source_mapping.ColumnName = latest_model_columns.ObjectName
        ) AS model_column_with_mapping ON
            model_column_with_mapping.ModelUuid = unused_column.ModelUuid
            AND model_column_with_mapping.RunUuid = unused_column.RunUuid
            AND model_column_with_mapping.SourceTableName = unused_column.SourceTableName
            AND model_column_with_mapping.SourceColumnName = unused_column.SourceColumnName
    ),

    -- Union all objects
    union_all_objects AS (
        SELECT * FROM latest_report_measures
        UNION
        SELECT * FROM latest_model_columns
        UNION
        SELECT * FROM latest_model_measures
        UNION
        SELECT * FROM latest_unused_columns
    ),

    -- Enrich and keep the latest records
    keep_latest_record_and_enrich AS (
        SELECT
            union_all_objects.TableName,
            union_all_objects.ObjectName,
            union_all_objects.ObjectType,
            union_all_objects.Expression,
            union_all_objects.Description,
            union_all_objects.ModifiedDate,
            union_all_objects.DeletedFromModelFlag,
            union_all_objects.ModelUuid,
            CASE 
                WHEN MAX(union_all_objects.AsOfDate) OVER() = union_all_objects.AsOfDate THEN 'False'
                ELSE 'True'
            END AS DeletedFromLakehouseFlag,
            {
                generate_table_key(
                    "union_all_objects.ModelUuid",
                    "union_all_objects.TableName",
                    '''
                        CASE 
                            WHEN union_all_objects.ObjectType = 'REPORT MEASURE'
                            THEN union_all_objects.Expression
                            ELSE union_all_objects.ObjectName
                        END
                    ''',
                )
            } AS ModelObjectId
        FROM union_all_objects
        JOIN (
            SELECT
                ModelUuid,
                TableName,
                ObjectName,
                Expression,
                MAX(AsOfDateTime) AS MaxAsOfDateTime
            FROM union_all_objects
            GROUP BY ALL
        ) AS latest ON
            latest.ModelUuid = union_all_objects.ModelUuid
            AND latest.TableName = union_all_objects.TableName
            AND latest.ObjectName = union_all_objects.ObjectName
            AND latest.Expression = union_all_objects.Expression
            AND latest.MaxAsOfDateTime = union_all_objects.AsOfDateTime
    )
    SELECT * FROM keep_latest_record_and_enrich
""")

query_result.write.mode("overwrite").format("delta").option(
    "overwriteSchema", "true"
).saveAsTable(star_schema_table_names["dim_model_object"])

### Create ```DIM_Model```

In [None]:
query_result = spark.sql(f"""
    SELECT
        models.source_model_workspace_name AS WorkspaceName,
        models.source_model_name AS ModelName,
        models.ModelUuid AS ModelUuid,
        models.source_lakehouse_name AS LakehouseName,
        models.source_lakehouse_uuid AS LakehouseUUid,
        models.source_lakehouse_workspace_name AS LakehouseWorkspaceName,
        models.source_lakehouse_workspace_uuid AS LakehouseWorkspaceUuid
    FROM {historical_table_names["run_history"]} AS models
    JOIN (
        SELECT
            ModelUuid,
            MAX(AsOfDateTime) AS AsOfDateTime
        FROM {historical_table_names["run_history"]}
        WHERE Status = 'completed'
        GROUP BY ModelUuid
    ) AS latest ON
        latest.ModelUuid = models.ModelUuid
        AND latest.AsOfDateTime = models.AsOfDateTime
""")

query_result.write.mode("overwrite").format("delta").option(
    "overwriteSchema", "true"
).saveAsTable(star_schema_table_names["dim_model"])

### Create ```DIM_Report```

In [None]:
if len(report_uuid_mapping) == 1 and report_uuid_mapping[0]["ReportUuid"] == "" and report_uuid_mapping[0]["MapToReportUuid"] == "":
    query = f"""
        SELECT
            CASE
                WHEN src_reports.ReportId = '' THEN 'Non-Report'
                ELSE src_reports.ReportName
            END AS ReportName,
            src_reports.WebUrl AS WebUrl,
            src_reports.WorkspaceId AS WorkspaceUuid,
            src_reports.WorkspaceName AS WorkspaceName,
            src_reports.ReportId AS ReportUuid
        FROM {historical_table_names["source_reports"]} AS src_reports
        JOIN (
            SELECT
                ReportId,
                MAX(AsOfDateTime) AS MaxAsOfDateTime
            FROM {historical_table_names["source_reports"]}
            GROUP BY ReportId
        ) AS latest_reports
            ON latest_reports.ReportId = src_reports.ReportId
            AND latest_reports.MaxAsOfDateTime = src_reports.AsOfDateTime
        UNION ALL
        SELECT
            'N/A' AS ReportName,
            'N/A' AS WebUrl,
            'N/A' AS WorkspaceUuid,
            'N/A' AS WorkspaceName,
            'N/A' AS ReportUuid
    """
else:
    spark.createDataFrame(report_uuid_mapping).createOrReplaceTempView("report_uuid_mapping")
    query = f"""
        WITH src_reports AS (
            SELECT
                sr.ReportId,
                sr.ReportName,
                sr.WebUrl,
                sr.WorkspaceId AS WorkspaceUuid,
                sr.WorkspaceName,
                sr.AsOfDateTime
            FROM {historical_table_names["source_reports"]} AS sr
            JOIN (
                SELECT
                    ReportId,
                    MAX(AsOfDateTime) AS MaxAsOfDateTime
                FROM {historical_table_names["source_reports"]}
                GROUP BY ReportId
            ) AS latest_reports
                ON sr.ReportId = latest_reports.ReportId
                AND sr.AsOfDateTime = latest_reports.MaxAsOfDateTime
        ),
        final_reports AS (
            SELECT
                src.ReportId AS ReportUuid,
                COALESCE(mapping_data.MappedReportName, src.ReportName) AS ReportName,
                COALESCE(mapping_data.MappedWebUrl, src.WebUrl) AS WebUrl,
                COALESCE(mapping_data.MappedWorkspaceUuid, src.WorkspaceUuid) AS WorkspaceUuid,
                COALESCE(mapping_data.MappedWorkspaceName, src.WorkspaceName) AS WorkspaceName
            FROM src_reports AS src
            LEFT JOIN (
                SELECT
                    m.ReportUuid AS OldId,
                    mapped.ReportName AS MappedReportName,
                    mapped.WebUrl AS MappedWebUrl,
                    mapped.WorkspaceUuid AS MappedWorkspaceUuid,
                    mapped.WorkspaceName AS MappedWorkspaceName
                FROM report_uuid_mapping AS m
                LEFT JOIN src_reports AS mapped
                    ON m.MapToReportUuid = mapped.ReportId
            ) AS mapping_data
                ON src.ReportId = mapping_data.OldId
        )
        SELECT
            ReportUuid,
            CASE WHEN ReportUuid = '' THEN 'Non-Report' ELSE ReportName END AS ReportName,
            WebUrl,
            WorkspaceUuid,
            WorkspaceName
        FROM final_reports
        UNION ALL
        SELECT
            'N/A' AS ReportUuid,
            'N/A' AS ReportName,
            'N/A' AS WebUrl,
            'N/A' AS WorkspaceUuid,
            'N/A' AS WorkspaceName
    """
    
query_result = spark.sql(query)
query_result.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable(star_schema_table_names["dim_report"])

### Create ```DIM_User```

In [None]:
query_result = spark.sql(f"""
    WITH union_data AS (
        SELECT
            'Masked' AS ExecutingUser,
            ExecutingUserGroup
        FROM {historical_table_names["object_query_count"]}
        UNION ALL
        SELECT
            ExecutingUser,
            ExecutingUserGroup
        FROM {historical_table_names["detailed_logs"]}
    ),
    remove_duplicates_and_add_key AS (
        SELECT DISTINCT
            ExecutingUser,
            ExecutingUserGroup,
            {generate_table_key("ExecutingUser", "ExecutingUserGroup")} AS UserId
        FROM union_data
    )
    SELECT * FROM remove_duplicates_and_add_key
""")

query_result.write.mode("overwrite").format("delta").option(
    "overwriteSchema", "true"
).saveAsTable(star_schema_table_names["dim_user"])

### Create ```FACT_ModelObjectQueryCount```
Maps queries back to model objects, including dependencies.

In [None]:
query_result = spark.sql(f"""
    -- Map query counts with model objects
    WITH query_counts_with_mapping AS (
        SELECT
            query_count.AsOfDate,
            query_count.AsOfHour,
            query_count.ExecutingUserGroup,
            IFNULL(query_count.ReportId, 'N/A') AS ReportUuid,
            query_count.QueryCount,
            query_count.RunUuid,
            query_count.ObjectType,
            query_count.ModelUuid,
            query_count.ModelObject,
            mapping.TableName,
            mapping.ObjectName,
            {
                generate_table_key(
                    "query_count.ModelUuid",
                    "mapping.TableName",
                    '''
                        CASE 
                            WHEN query_count.ObjectType = 'REPORT MEASURE'
                            THEN query_count.ModelObject
                            ELSE mapping.ObjectName
                        END
                    ''',
                )
            } AS ModelObjectId
        FROM
            {historical_table_names["object_query_count"]} AS query_count
        LEFT JOIN
            {historical_table_names["object_mapping"]} AS mapping ON
                mapping.ModelUuid = query_count.ModelUuid
                AND mapping.RunUuid = query_count.RunUuid
                AND mapping.ModelObject = query_count.ModelObject
    ),

    -- Identify dependencies
    dependencies AS (
        SELECT DISTINCT
            ModelUuid,
            TableName,
            ObjectName,
            ReferencedTableName,
            ReferencedObjectName,
            RunUuid,
            ObjectType
        FROM {historical_table_names["dependencies"]}
    ),

    -- Join dependencies with query counts
    dependencies_join_query_count AS (
        SELECT
            dependencies.ModelUuid,
            query_count.AsOfDate,
            query_count.AsOfHour,
            {
                generate_table_key(
                    "dependencies.ModelUuid",
                    "dependencies.ReferencedTableName",
                    "dependencies.ReferencedObjectName",
                )
            } AS ModelObjectId,
            IFNULL(query_count.ReportUuid, 'N/A') AS ReportUuid,
            query_count.ExecutingUserGroup,
            query_count.QueryCount
        FROM 
            dependencies
        LEFT JOIN
            query_counts_with_mapping AS query_count ON
                query_count.ModelUuid = dependencies.ModelUuid
                AND query_count.RunUuid = dependencies.RunUuid
                AND query_count.TableName = dependencies.TableName
                AND query_count.ObjectName = dependencies.ObjectName
        WHERE
            query_count.ObjectName IS NOT NULL
    ),

    -- Union data to create the final fact table
    union_model_and_report_objects AS (
        SELECT
            ModelUuid,
            AsOfDate,
            AsOfHour,
            ModelObjectId,
            ReportUuid,
            {generate_table_key("'Masked'", "ExecutingUserGroup")} AS UserId,
            'True' AS DirectReferenceFlag,
            QueryCount
        FROM
            query_counts_with_mapping
        UNION ALL
        SELECT
            ModelUuid,
            AsOfDate,
            AsOfHour,
            ModelObjectId,
            ReportUuid,
            {generate_table_key("'Masked'", "ExecutingUserGroup")} AS UserId,
            'False' AS DirectReferenceFlag,
            QueryCount
        FROM
            dependencies_join_query_count
    )

    -- Select all records from the final union
    SELECT * FROM union_model_and_report_objects
""")

query_result.write.mode("overwrite").format("delta").option(
    "overwriteSchema", "true"
).saveAsTable(star_schema_table_names["fact_model_object_query_count"])

### Create ```FACT_ModelLogs```
Stores detailed DAX query logs for performance analysis.

In [None]:
query_result = spark.sql(f"""
    SELECT
        AsOfDate,
        OperationName,
        OperationDetailName,
        ReportId AS ReportUuid,
        ModelUuid,
        Timestamp AS DateTime,
        {generate_table_key("ExecutingUser", "ExecutingUserGroup")} AS UserId,
        DurationMs,
        CpuTimeMs,
        EventText,
        OperationId,
        XmlaSessionId,
        ActivityId,
        RequestId,
        CurrentActivityId,
        StatusCode,
        Status
    FROM
        {historical_table_names["detailed_logs"]} AS query_count
""")

query_result.write.mode("overwrite").format("delta").option(
    "overwriteSchema", "true"
).saveAsTable(star_schema_table_names["fact_detailed_logs"])

### Create ```FACT_ModelObjectStatistics```
Blends cold-cache data, table residency, and table sizes for columns.

In [None]:
query_result = spark.sql(f"""
    -- Create distinct combinations of object and date
    WITH distinct_object_date_combo AS (
        SELECT DISTINCT
            AsOfDate,
            ModelUuid,
            TableName,
            ColumnName AS ObjectName,
            {generate_table_key("ModelUuid", "TableName", "ColumnName")} AS ModelObjectId
        FROM
            {historical_table_names["model_columns"]}
    ),

    -- Map cold cache with object mapping
    cold_cache_with_mapping AS (
        SELECT
            mapping.TableName,
            mapping.ObjectName,
            cold_cache.ModelUuid,
            cold_cache.AsOfDate,
            cold_cache.Duration,
            cold_cache.CpuTime
        FROM
            {historical_table_names["cold_cache_measurements"]} AS cold_cache
        LEFT JOIN
            {historical_table_names["object_mapping"]} AS mapping ON
                mapping.RunUuid = cold_cache.RunUuid
                AND mapping.ModelObject = cold_cache.ColumnName
        WHERE 
            cold_cache.Success = 'Success'
            AND cold_cache.ColumnName IS NOT NULL
    ),

    -- Join facts and aggregate metrics
    join_facts AS (
        SELECT
            combos.ModelUuid,
            combos.AsOfDate,
            combos.ModelObjectId,
            COUNT(residency.IsResident) AS ColumnResidencyMeasuredCount,
            SUM(CASE WHEN residency.IsResident = True THEN 1 ELSE 0 END) AS ColumnResidencyTrueCount,
            AVG(data_size.TotalSize) AS TotalSize,
            AVG(data_size.DataSize) AS DataSize,
            AVG(data_size.DictionarySize) AS DictionarySize,
            AVG(data_size.HierarchySize) AS HierarchySize,
            AVG(cold_cache.Duration) AS DurationTime,
            AVG(cold_cache.CpuTime) AS CpuTime
        FROM
            distinct_object_date_combo AS combos
        LEFT JOIN
            {historical_table_names["model_columns"]} AS residency ON
                residency.ModelUuid = combos.ModelUuid
                AND residency.TableName = combos.TableName
                AND residency.ColumnName = combos.ObjectName
                AND residency.AsOfDate = combos.AsOfDate
        LEFT JOIN
            {historical_table_names["resident_statistics"]} AS data_size ON
                data_size.ModelUuid = combos.ModelUuid
                AND data_size.TableName = combos.TableName
                AND data_size.ColumnName = combos.ObjectName
                AND data_size.AsOfDate = combos.AsOfDate
        LEFT JOIN
            cold_cache_with_mapping AS cold_cache ON
                cold_cache.ModelUuid = combos.ModelUuid
                AND cold_cache.TableName = combos.TableName
                AND cold_cache.ObjectName = combos.ObjectName
                AND cold_cache.AsOfDate = combos.AsOfDate
        GROUP BY ALL
    )

    -- Select all results from the final join
    SELECT * FROM join_facts
""")

query_result.write.mode("overwrite").format("delta").option(
    "overwriteSchema", "true"
).saveAsTable(star_schema_table_names["fact_model_statistics"])

In [None]:
mssparkutils.session.stop()