In [1]:
##### Large Model Discovery



StatementMeta(, a417a8a5-dba1-4a02-9e81-ca1019c93274, 5, Finished, Available, Finished)

# CMF - Large Sementic Models discovery

### Key Changes and Improvements

This version of the notebook incorporates several enhancements over the original:

* **Dynamic Bin Calculation:**
    * Introduced `p_target_models_per_bin` and `p_max_bins` parameters.
    * If `p_num_bins` is not explicitly set (>0), the notebook calculates the optimal number of parallel bins based on the target models per bin, respecting the maximum limit and the total model count.
    * Defaults to sequential processing (1 bin) if no binning strategy is defined.

* **Improved Robustness in Bin Handling for Parallelism::**
    * Models are now assigned to bins using a method (`numpy.floor` division on a sequential index) that ensures a much more even distribution compared to the previous modulo approach, leading to better load balancing across parallel runs.
    * When preparing for parallel execution with `notebookutils.notebook.runMultiple`, if the query to get distinct `Bin` numbers from the `models_discovery_status` table fails or returns an empty list (despite models being present), the script attempts to reconstruct the list of bins to iterate over (from 1 to `actual_num_bins`). This helps prevent the parallel processing step from failing unnecessarily.


* **Robust Delta Table Handling:**
    * Introduced an `_initialize_delta_table` function to create or validate the schema of the output Delta table (`models_discovery_status`) before writing, preventing schema mismatch errors on subsequent runs.
    * Uses an explicit schema (`StructType`) for the Delta table for consistency.
    * Initializes model `Size` to `-1` (LongType) to clearly differentiate pending/unknown status from an actual size of 0 bytes.
    * Added a `Last Updated` timestamp column for better tracking.

* **Optional Size Filtering:**
    * Added the `p_min_model_size_gb` parameter.
    * If set to a value greater than 0, models successfully scanned but found to be *smaller* than this threshold (in GB) will have their status marked as `Ignored (Size)`. Set to `0` or `None` to disable this filter.

* **Enhanced Error Logging and Status Tracking:**
    * The output Delta table now uses more descriptive statuses: `Pending`, `Success`, `Failure`, `Ignored (Default)`, `Ignored (Size)`.
    * Detailed error messages are captured in the `Message` column.
    * A **Final Summary Report** is generated by the main notebook instance *after* all processing completes. This report queries the Delta table to show counts per status, lists models that failed, and lists large models that were successfully processed.
    * Improved exception handling for common issues like Fabric API errors (`FabricHTTPException`) and workspace access problems (`WorkspaceNotFoundException`).

* **Sequential Fallback:**
    * If the calculated number of bins is only one, the notebook automatically runs the processing logic sequentially within the main instance, avoiding the overhead of `notebookutils.runMultiple` for a single task.

* **Improved Clarity and Structure:**
    * Added more detailed print statements for better real-time progress monitoring.
    * Code grouped into logical steps with explanatory comments.
    * Necessary parameters are passed correctly to the sub-processes launched by `runMultiple`.

* **Automated Workspace Access Management (Grant & Revoke):**
    * **Granting Access (Step 0):** The script now includes a significant pre-processing step to grant the executing user 'Member' access to workspaces specified in the `p_only_workspaces` parameter. This uses `labs_admin.add_workspace_user`.
        * It intelligently attempts to resolve workspace names to IDs using `fabric.resolve_workspace_id` for robustness before attempting the grant.
        * It records which workspaces had permissions potentially altered by this step.
    * **Revoking Access (Step 10 - Cleanup):** A crucial `finally` block ensures that the script attempts to revoke the 'Member' access it might have granted. This uses `labs_admin.delete_workspace_user` and the list of modified workspaces. This is a critical addition for maintaining a clean security posture.
* **Specific Model Name Exclusion Filter:**
    * A direct filter has been implemented within the sub-process/sequential processing loop to exclude any semantic model explicitly named 'Report Usage Metrics Model'.
    * These excluded models are assigned a distinct status: `Ignored (Name Filter)`, which should be noted as a new possible status in the output Delta table.


### How to Use

1.  **Replace Code:** Substitute the code in your existing notebook with this improved version.
2.  **Prerequisites:** Ensure all original prerequisites are met:
    * A default Lakehouse is associated with the notebook's workspace.
    * The `semantic-link-labs` library is installed in the Fabric Environment used by this notebook (install from PyPI).
3.  **Configure Parameters:** Carefully review and adjust the parameters defined in the first cell (`--- Parameters ---`), paying close attention to:
    * **Parallelism:** Choose *either* `p_target_models_per_bin` (recommended for dynamic sizing) *or* `p_num_bins` (for a fixed number of threads). Set the unused option to `0`. Remember `p_max_bins`.
    * **Scoping:** Use `p_only_workspaces` or `p_exclude_workspaces` to limit which workspaces are scanned. Use workspace *names* or *GUIDs*.
    * **Size Filtering:** Set `p_min_model_size_gb` to your desired minimum size (e.g., `10` for 10GB) or set it to `0` or `None` to process all non-default models regardless of size.
    * **Backoff Strategy:** Adjust `p_backoff_threshold`, `p_backoff_delay`, and `p_backoff_delay_multiple` if you encounter API throttling issues (HTTP 429 errors).
4.  **Run Notebook:** Execute the notebook cells sequentially.
    * The primary instance (`_p_sub_bin == 0`) will:
        * List models.
        * Apply filters.
        * Calculate bins and assign models.
        * Save the initial list to the `models_discovery_status` Delta table in your default Lakehouse.
        * Launch parallel notebook runs (if `num_bins > 1`) or run sequentially (if `num_bins == 1`).
        * Wait for completion.
        * Generate the Final Summary Report.
    * Sub-process instances (`_p_sub_bin > 0`) will:
        * Read their assigned models from the Delta table.
        * Call `get_semantic_model_size` for each model.
        * Apply size filtering if enabled.
        * Update the status, size, and message for each model back into the Delta table.
5.  **Review Results:** Check the output of the **Final Summary Report** cell and query the `models_discovery_status` Delta table in your Lakehouse for detailed results.


In [None]:
# --- Parameters ---
# Processing Strategy
# Option 1: Set p_num_bins directly (fixed number of parallel threads). Set p_target_models_per_bin = 0
# Option 2: Set p_target_models_per_bin (desired models per thread). p_num_bins will be calculated.
p_num_bins = 0                  # Override: Set > 0 to fix the number of bins, ignoring p_target_models_per_bin. Set to 1 for sequential processing.
p_target_models_per_bin = 3    # Desired number of models per parallel bin (thread). Used if p_num_bins is 0 or less.
p_max_bins = 25                # Maximum number of parallel bins allowed when calculating dynamically. (Reduced from 50 to 15)

# Filtering
workspaces_df = spark.sql(f"Select DISTINCT A.Workspace_Id from SemanticModels A INNER JOIN Workspaces B ON   B.Id = A.Workspace_Id INNER JOIN Capacities C ON  B.Capacity_Id = C.Capacity_Id  WHERE C.Capacity_Id  IN ('CapacityIDs')") 
p_only_workspaces = [row.Workspace_Id for row in workspaces_df.collect()]
#p_only_workspaces = ['Workspace1','Workspace2','Workspace3']          # List of workspace names or IDs to *include*. Leave empty to process all allowed workspaces.
#p_only_workspaces = []
# Example: ['Migration_Clone_1','Migration_Clone_2','f1a1b1c1-d1e1-f1a1-b1c1-d1e1f1a1b1c1']
p_exclude_workspaces = []                                 # List of workspace names or IDs to *exclude*. Only used if p_only_workspaces is empty.
# Example: ['Default Workspace','My Other Workspace']

p_min_model_size_gb = 10       # Optional: Set minimum model size in GB to process fully. Models smaller than this will be marked 'Ignored (Size)'. Set to 0 or None to disable.

# Backoff Strategy (for API throttling)
p_backoff_threshold = 30        # Seconds: If a single model size check takes longer than this, apply backoff delay.
p_backoff_delay = 90            # Seconds: Initial delay to apply when backoff is triggered.
p_backoff_delay_multiple = 2    # Multiplier: Increase delay by this factor on subsequent backoffs.

# Internal Parameter (Do Not Change Manually)
_p_sub_bin = 0                  # Used internally by runMultiple to identify the sub-process bin.

StatementMeta(, a417a8a5-dba1-4a02-9e81-ca1019c93274, 6, Finished, Available, Finished)

In [None]:
import notebookutils
import pandas as pd
import datetime, time, math, json, requests
import numpy as np
from uuid import UUID

import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException
import sempy_labs as labs
import sempy_labs.admin as labs_admin

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, current_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType

# --- Icon Definitions ---
icons = {
    "start": "🚀", "finish": "🏁", "main": " orchestrator ", "sub": " sub-process ",
    "step": "⚙️", "info": "ℹ️", "success": "✅", "warning": "⚠️", "error": "❌",
    "data": "💾", "spark": "✨", "parallel": "🔀", "report": "📊", "key": "🔑"
}

# --- Initialization ---
spark = SparkSession.builder.appName("LargeModelDiscovery").getOrCreate()
print(f"{icons['start']} [{datetime.datetime.now()}] CMF - Large Semantic Model Discovery Notebook Initialized {icons['start']}")
print("-" * 80)

#------------------------------------------------------------------------------------#
#                                HELPER FUNCTIONS                                    #
#------------------------------------------------------------------------------------#

def _is_valid_uuid(guid_string: str) -> bool:
    try:
        UUID(guid_string, version=4); return True
    except ValueError:
        return False

def _resolve_workspace_ids(p_list: list, all_ws_df: pd.DataFrame, list_name: str) -> list:
    """Converts a list of workspace names or IDs into a list of IDs only."""
    if not p_list: return []
    print(f"{icons['info']} Resolving workspace names to IDs for '{list_name}'...")
    resolved_ids = []
    
    # Prepare a lookup series for faster, case-insensitive matching
    name_to_id_map = all_ws_df.set_index(all_ws_df['Name'].str.lower())['Id']

    for item in p_list:
        if _is_valid_uuid(item):
            resolved_ids.append(item)
            print(f"  {icons['success']} Found valid ID: '{item}'")
        else:
            item_lower = item.lower()
            if item_lower in name_to_id_map:
                resolved_id = name_to_id_map[item_lower]
                resolved_ids.append(resolved_id)
                print(f"  {icons['success']} Resolved '{item}' to ID: {resolved_id}")
            else:
                print(f"  {icons['warning']} Workspace name '{item}' not found. It will be skipped.")
    return resolved_ids

def _grant_workspace_access(workspaces: list, all_ws_df: pd.DataFrame, user_principal: str) -> list:
    """Grants 'Member' access, logs it, and returns a list of modified workspaces."""
    if not workspaces:
        print(f"{icons['info']} 'p_only_workspaces' is empty. Skipping permission grants.")
        return []

    print(f"\\n{icons['key']} STEP: Granting current user '{user_principal}' access to specified workspaces...")
    modified_ws = []
    
    resolved_ids = _resolve_workspace_ids(workspaces, all_ws_df, "p_only_workspaces")

    for ws_id in resolved_ids:
        print(f"  Processing workspace: {ws_id}")
        try:
            existing_users_df = labs_admin.list_workspace_users(workspace=ws_id)
            user_exists = not existing_users_df[existing_users_df['Identifier'].str.lower() == user_principal.lower()].empty
            
            if user_exists:
                print(f"    {icons['success']} User already has access. No action needed.")
            else:
                print(f"    {icons['info']} User not found. Granting 'Member' access...")
                labs_admin.add_user_to_workspace(workspace=ws_id, user=user_principal, role="Member")
                modified_ws.append(ws_id)
                print(f"    {icons['success']} Successfully granted 'Member' role.")
                
                # --- NEW: Log the permission grant to the Delta table ---
                try:
                    log_sql = f"""
                        INSERT INTO permission_audit_log (Workspace_Id, User_Principal, Role_Granted, Grant_Timestamp, Status)
                        VALUES ('{ws_id}', '{user_principal}', 'Member', current_timestamp(), 'Granted')
                    """
                    spark.sql(log_sql)
                    print(f"    {icons['data']} Successfully logged permission grant.")
                except Exception as log_e:
                    print(f"    {icons['error']} CRITICAL: Failed to log permission grant for workspace '{ws_id}'. Manual cleanup may be required. Error: {log_e}")

        except Exception as e:
            print(f"    {icons['error']} Could not check or grant permissions for workspace '{ws_id}'. Error: {e}")
            
    return modified_ws

def _revoke_workspace_access(modified_workspaces: list, user_principal: str):
    """Revokes 'Member' access and updates the audit log."""
    if not modified_workspaces:
        print(f"\\n{icons['key']} CLEANUP: No workspace permissions to revoke for this run.")
        return

    print(f"\\n{icons['key']} CLEANUP: Revoking 'Member' access for '{user_principal}' from {len(modified_workspaces)} workspace(s)...")
    for ws_id in modified_workspaces:
        try:
            print(f"  Removing user from workspace: '{ws_id}'")
            labs_admin.delete_user_from_workspace(workspace=ws_id, user=user_principal)
            print(f"    {icons['success']} Successfully revoked permissions via API.")
            
            # --- NEW: Update the audit log ---
            try:
                update_sql = f"""
                    UPDATE permission_audit_log
                    SET Status = 'Revoked', Revoke_Timestamp = current_timestamp()
                    WHERE Workspace_Id = '{ws_id}' AND User_Principal = '{user_principal}' AND Status = 'Granted'
                """
                spark.sql(update_sql)
                print(f"    {icons['data']} Successfully updated audit log to 'Revoked'.")
            except Exception as log_e:
                print(f"    {icons['warning']} Could not update audit log for workspace '{ws_id}'. The permission was revoked, but the log may be out of sync. Error: {log_e}")

        except requests.exceptions.HTTPError as http_err:
            if http_err.response.status_code == 404:
                print(f"    {icons['success']} User was not found (already removed). Cleanup successful.")
            else:
                print(f"    {icons['warning']} Could not remove user. Status: {http_err.response.status_code}")
        except Exception as e:
            print(f"    {icons['error']} An unexpected error occurred during revocation. Manual check may be needed. Error: {e}")

def _initialize_permission_audit_table():
    """Creates the permission_audit_log Delta table if it does not exist."""
    table_name = "permission_audit_log"
    print(f"{icons['step']} Initializing or validating schema for Delta table: '{table_name}'")
    
    # Define the schema for tracking permission changes
    schema = StructType([
        StructField("Workspace_Id", StringType(), False),
        StructField("User_Principal", StringType(), False),
        StructField("Role_Granted", StringType(), True),
        StructField("Grant_Timestamp", TimestampType(), True),
        StructField("Revoke_Timestamp", TimestampType(), True),
        StructField("Status", StringType(), True)  # e.g., 'Granted', 'Revoked'
    ])
    
    # Create an empty DataFrame with the schema
    empty_df = spark.createDataFrame([], schema)
    
    # Create the table if it doesn't exist, preserving the schema
    empty_df.write.format("delta").mode("ignore").saveAsTable(table_name)
    print(f"  {icons['success']} Table '{table_name}' is ready.")

#------------------------------------------------------------------------------------#
#                            ORCHESTRATOR (MAIN) LOGIC                               #
#------------------------------------------------------------------------------------#
def run_orchestrator():
    """Main function for the orchestrator notebook instance."""
    start_time = datetime.datetime.now()
    print(f"{icons['main']} Running in Orchestrator Mode (_p_sub_bin = 0) at {start_time} {icons['main']}")
    
    workspaces_modified = []
    current_user = ""
    
    try:
        # --- NEW: Step -1: Initialize Audit Table ---
        _initialize_permission_audit_table()

        current_user = mssparkutils.env.getUserName()
        print(f"Current User Principal Name (UPN): {current_user}")

        # --- NEW: Step -0.5: Pre-run Cleanup of Orphaned Permissions ---
        print(f"\\n{icons['key']} STEP: Checking for orphaned permissions from previous runs...")
        try:
            orphaned_df = spark.sql(f"SELECT Workspace_Id FROM permission_audit_log WHERE Status = 'Granted' AND User_Principal = '{current_user}'")
            orphaned_workspaces = [row.Workspace_Id for row in orphaned_df.collect()]
            
            if orphaned_workspaces:
                print(f"  {icons['warning']} Found {len(orphaned_workspaces)} orphaned permission(s) for '{current_user}'. Attempting to revoke now.")
                _revoke_workspace_access(orphaned_workspaces, current_user)
            else:
                print(f"  {icons['success']} No orphaned permissions found.")
        except Exception as e:
            print(f"  {icons['error']} Could not check for or clean up orphaned permissions. Error: {e}")
            
        # --- Step 0: Permissions ---
        current_user = mssparkutils.env.getUserName()
        print(f"Current User Principal Name (UPN): {current_user}")
        
        print(f"\n{icons['step']} STEP 0: Permissions & Workspace Resolution")
        all_ws_df = labs_admin.list_workspaces()
        print(f"  {icons['info']} Fetched {len(all_ws_df)} workspaces from the tenant.")
        workspaces_modified = _grant_workspace_access(p_only_workspaces, all_ws_df, current_user)

        # --- Step 1: List & Filter Models ---
        print(f"\n{icons['step']} STEP 1: Discovering Large Format Semantic Models")
        df_models_all = labs_admin.list_datasets()
        df_large_models = df_models_all[df_models_all['Target Storage Mode'] == 'PremiumFiles'].copy()
        print(f"  {icons['info']} Found {len(df_large_models)} total Large Format models in the tenant.")

        resolved_include_list = _resolve_workspace_ids(p_only_workspaces, all_ws_df, "p_only_workspaces")
        resolved_exclude_list = _resolve_workspace_ids(p_exclude_workspaces, all_ws_df, "p_exclude_workspaces")

        if resolved_include_list:
            df_models_filtered = df_large_models[df_large_models["Workspace Id"].isin(resolved_include_list)]
            print(f"  {icons['success']} Applied 'include' filter. Models remaining: {len(df_models_filtered)}")
        elif resolved_exclude_list:
            df_models_filtered = df_large_models[~df_large_models["Workspace Id"].isin(resolved_exclude_list)]
            print(f"  {icons['success']} Applied 'exclude' filter. Models remaining: {len(df_models_filtered)}")
        else:
            df_models_filtered = df_large_models
            print(f"  {icons['warning']} No workspace filters applied. Processing all found models.")
        
        num_models = len(df_models_filtered)
        if num_models == 0:
            print(f"\n{icons['finish']} No models match the criteria. Halting execution.")
            return

        # --- Step 2: Binning ---
        print(f"\n{icons['step']} STEP 2: Calculating Bins for Parallel Processing")
        if p_num_bins > 0:
            num_bins = p_num_bins
        elif p_target_models_per_bin > 0:
            num_bins = min(math.ceil(num_models / p_target_models_per_bin), p_max_bins, num_models)
        else:
            num_bins = 1
        num_bins = max(1, num_bins)
        print(f"  {icons['info']} Strategy: {num_bins} bin(s) will be used to process {num_models} models.")

        df_models_filtered = df_models_filtered.reset_index(drop=True)
        df_models_filtered['Bin'] = (np.floor(df_models_filtered.index * num_bins / num_models) + 1).astype(int)

        # --- Step 3: Prepare and Save to Delta ---
        print(f"\n{icons['step']} STEP 3: Preparing and Saving Task List to Delta Table")
        delta_table_name = "models_discovery_status"
        df_to_save = df_models_filtered[['Dataset Id', 'Dataset Name', 'Workspace Id', 'Bin']].rename(
            columns={"Dataset Id": "Dataset_Id", "Dataset Name": "Dataset_Name", "Workspace Id": "Workspace_Id"}
        )
        spark_df = spark.createDataFrame(df_to_save)
        spark_df = (spark_df
                    .withColumn("Status", lit("Pending"))
                    .withColumn("Size", lit(-1).cast(LongType()))
                    .withColumn("Message", lit(""))
                    .withColumn("Last_Updated", current_timestamp()))

        spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("Bin").saveAsTable(delta_table_name)
        print(f"  {icons['data']} Successfully saved {spark_df.count()} tasks to Delta table '{delta_table_name}'.")

        # --- Step 4: Execute Processing ---
        print(f"\n{icons['step']} STEP 4: Executing Model Size Discovery")
        bin_rows = spark.sql(f"SELECT DISTINCT Bin FROM {delta_table_name} ORDER BY Bin").collect()

        if len(bin_rows) > 1:
            print(f"  {icons['parallel']} Launching {len(bin_rows)} parallel sub-processes...")
            activities = []
            for row in bin_rows:
                bin_num = row['Bin']
                activities.append({
                    "name": f"Discovery_Sub_Bin_{bin_num}", "path": notebookutils.runtime.context['currentNotebookName'],
                    "timeoutPerCellInSeconds": 10800,
                    "args": {"_p_sub_bin": bin_num, "p_min_model_size_gb": p_min_model_size_gb, 
                             "p_backoff_threshold": p_backoff_threshold, "p_backoff_delay": p_backoff_delay,
                             "p_backoff_delay_multiple": p_backoff_delay_multiple},
                    "retry": 0
                })
            exit_values = notebookutils.notebook.runMultiple({"activities": activities})
            print(f"\n{icons['success']} Parallel execution finished.")
            print("  Sub-process exit summaries:")
            for val in exit_values: print(f"    - {val}: {exit_values[val].get('exitVal', 'No exit value')}")
        elif len(bin_rows) == 1:
            print(f"  {icons['info']} Only one bin found. Running sequentially in this notebook...")
            global _p_sub_bin
            _p_sub_bin = bin_rows[0]['Bin']
            run_subprocess() # Run directly
        else:
            print(f"  {icons['warning']} No bins with models found. Skipping processing.")

        # --- Step 5: Final Report ---
        print(f"\n{icons['report']} STEP 5: Generating Final Summary Report {icons['report']}")
        summary_df = spark.sql(f"SELECT Status, COUNT(*) as Count, SUM(CASE WHEN Size >= 0 THEN Size ELSE 0 END) as TotalSizeBytes FROM {delta_table_name} GROUP BY Status")
        print("\n--- Final Status Counts ---")
        summary_df.show()

        failed_models_df = spark.sql(f"SELECT Dataset_Name, Workspace_Id, Message FROM {delta_table_name} WHERE Status = 'Failure'")
        if not failed_models_df.isEmpty():
            print("\n--- Models with Failures ---")
            failed_models_df.show(truncate=False)

        min_size_bytes = (p_min_model_size_gb or 0) * (1024**3)
        success_models_df = spark.sql(f"""
            SELECT Dataset_Name, Workspace_Id, ROUND(Size / (1024*1024*1024), 3) as Size_GB 
            FROM {delta_table_name} 
            WHERE Status = 'Success' AND Size >= {min_size_bytes}
            ORDER BY Size DESC
        """)
        if not success_models_df.isEmpty():
            print(f"\n--- Successfully Processed Models (>= {p_min_model_size_gb or 0} GB) ---")
            success_models_df.show(truncate=False)

    except Exception as e:
        print(f"{icons['error']} A critical error occurred in the orchestrator: {e}")
    finally:
        # --- Step 6: Cleanup ---(This now only cleans up permissions from the CURRENT run)
        _revoke_workspace_access(workspaces_modified, current_user)
        end_time = datetime.datetime.now()
        print("-" * 80)
        print(f"{icons['finish']} Orchestrator Finished at {end_time}. Total Time: {end_time - start_time} {icons['finish']}")
        notebookutils.notebook.exit("Orchestration complete.")

#------------------------------------------------------------------------------------#
#                              SUB-PROCESS LOGIC                                     #
#------------------------------------------------------------------------------------#
def run_subprocess():
    """Main function for the sub-process notebook instance."""
    start_time = datetime.datetime.now()
    print(f"\n{icons['sub']} Running in Sub-Process Mode for Bin #{_p_sub_bin} at {start_time} {icons['sub']}")
    
    delta_table_name = "models_discovery_status"
    success, error, ignored_default, ignored_size, ignored_name = 0, 0, 0, 0, 0
    
    try:
        models_to_process = spark.sql(f"SELECT Dataset_Id, Dataset_Name, Workspace_Id FROM {delta_table_name} WHERE Bin = {_p_sub_bin} AND Status = 'Pending'").collect()
    except Exception as e:
        notebookutils.notebook.exit(f"❌ FATAL: Could not query models for Bin #{_p_sub_bin}. Error: {e}")

    total_models = len(models_to_process)
    print(f"  {icons['info']} Found {total_models} models to process in this bin.")
    min_size_bytes = (p_min_model_size_gb or 0) * (1024**3)

    for i, row in enumerate(models_to_process):
        print(f"\n  ({i+1}/{total_models}) 🔍 Processing model: '{row['Dataset_Name']}'")
        status, size, message = "Failure", -1, ""
        
        try:
            if row['Dataset_Name'] == 'Report Usage Metrics Model':
                status, size, message = "Ignored (Name Filter)", 0, "Model excluded by name."
                ignored_name += 1
            elif labs.is_default_semantic_model(dataset=row['Dataset_Name'], workspace=row['Workspace_Id']):
                status, size, message = "Ignored (Default)", 0, "Default semantic model."
                ignored_default += 1
            else:
                retrieved_size = labs.get_semantic_model_size(dataset=row['Dataset_Name'], workspace=row['Workspace_Id'])
                if min_size_bytes > 0 and retrieved_size < min_size_bytes:
                    status, size, message = "Ignored (Size)", retrieved_size, f"Model size is smaller than {p_min_model_size_gb} GB."
                    ignored_size += 1
                else:
                    status, size, message = "Success", retrieved_size, f"Size retrieved successfully: {round(retrieved_size/(1024**3), 3)} GB."
                    success += 1
        except Exception as e:
            message = f"Error getting size: {str(e)}"
            error += 1
            print(f"    {icons['error']} {message}")

        print(f"    -> Status: {status}")
        update_sql = f"""
            UPDATE {delta_table_name}
            SET Status = '{status}', Size = {size}, Message = '{str(message).replace("'", "''")[:3900]}', Last_Updated = current_timestamp()
            WHERE Dataset_Id = '{row['Dataset_Id']}' AND Bin = {_p_sub_bin}
        """
        try:
            spark.sql(update_sql)
        except Exception as update_e:
            print(f"    {icons['error']} Failed to update Delta table for this model. Error: {update_e}")

    end_time = datetime.datetime.now()
    summary = (f"Bin #{_p_sub_bin} Finished. Processed: {total_models}. ✅Success: {success}, ❌Errors: {error}, "
               f"Ignored: {ignored_default+ignored_size+ignored_name}. Duration: {end_time - start_time}")
    print(f"\n{icons['finish']} {summary}")
    notebookutils.notebook.exit(summary)

# --- Execution ---
if __name__ == "__main__":
    if _p_sub_bin == 0:
        run_orchestrator()
    else:
        run_subprocess()

##### Final Cell (Optional - Display Results if run interactively after completion)

In [None]:
# --- Final Cell (Optional - Display Results if run interactively after completion) ---
# This cell will only execute if the notebook runs to completion without an explicit exit,
# or if you run it manually after the main process finishes.
# It's generally better to rely on the summary generated in Step 9 of the main block.
delta_table_name = "models_discovery_status"
# print("\n --- Displaying Sample of Final Results ---")
try:
     final_results_df = spark.sql(f"SELECT * FROM {delta_table_name} ORDER BY Bin, `Dataset_Name` LIMIT 1000")
     display(final_results_df)
except Exception as e:
     print(f"Could not display final results from {delta_table_name}. Error: {e}")

StatementMeta(, a417a8a5-dba1-4a02-9e81-ca1019c93274, -1, Cancelled, , Cancelled)