# Data Processing and Unity Catalog Writing Utilities

This notebook contains utilities for processing data and writing to Unity Catalog.

In [0]:
import re
import pandas as pd
from typing import Dict, Any, List
from datetime import datetime, timezone
from pyspark.sql import functions as F

In [0]:
def banner(txt: str):
    print("\n" + "="*22 + f" {txt} " + "="*22 + "\n")

def _flatten_obj(x: Any, prefix="") -> Dict[str, Any]:
    """Flatten nested dict/list into a single-level dict with dotted keys."""
    out = {}
    if isinstance(x, dict):
        for k, v in x.items():
            kk = f"{prefix}{k}" if not prefix else f"{prefix}.{k}"
            out.update(_flatten_obj(v, kk))
    elif isinstance(x, list):
        for i, v in enumerate(x):
            kk = f"{prefix}[{i}]"
            out.update(_flatten_obj(v, kk))
    else:
        out[prefix or "_value"] = x
    return out

def _dedup_columns(cols: List[str]) -> List[str]:
    """Deduplicate and sanitize column names safely for Spark + Unity Catalog + Delta."""
    seen = {}
    deduped = []
    for orig in cols:
        # Normalize case
        c = orig.lower()
        # Replace invalid or risky characters with underscores
        c = (c.replace(".", "_")
               .replace("[", "_")
               .replace("]", "_")
               .replace(" ", "_")
               .replace(":", "_")
               .replace("-", "_")
               .replace("=", "_")
               .replace("{", "_")
               .replace("}", "_")
               .replace("(", "_")
               .replace(")", "_")
               .replace(";", "_")
               .replace(",", "_")
               .replace("\"", "_")
               .replace("'", "_")
               .replace("\n", "_")
               .replace("\t", "_"))
        # Collapse duplicate underscores and trim
        c = re.sub(r"_+", "_", c).strip("_")
        # Deduplicate after normalization
        if c in seen:
            seen[c] += 1
            c = f"{c}__{seen[c]}"
        else:
            seen[c] = 0
        deduped.append(c)
    return deduped

def _convert_all_complex_to_json(obj: Any, keep_scalar_fields: List[str] = None) -> Any:
    """Convert ALL dict/list fields to JSON strings, keeping only scalar fields."""
    import json
    
    if not isinstance(obj, dict):
        return obj
    
    # For UC tables: keep only these simple scalar fields as columns
    # Everything else (nested dicts/lists) gets converted to JSON strings
    if keep_scalar_fields is None:
        keep_scalar_fields = [
            # Core identifiers
            'name', 'table_name', 'full_name', 'catalog_name', 'schema_name',
            'table_id', 'metastore_id',
            # Type and format
            'table_type', 'data_source_format',
            # Ownership and timestamps
            'owner', 'created_at', 'created_by', 'updated_at', 'updated_by',
            # Status
            'comment', 'generation',
            # Injected fields
            '_catalog', '_schema', '_collected_at', '_workspace'
        ]
    
    result = {}
    for key, value in obj.items():
        # If value is a dict or list, convert to JSON
        if isinstance(value, (dict, list)):
            try:
                result[f"{key}_json"] = json.dumps(value)
            except:
                result[f"{key}_json"] = str(value)
        # If value is a simple scalar and key is in whitelist, keep as-is
        elif key in keep_scalar_fields or not keep_scalar_fields:
            result[key] = value
        # Otherwise, convert to string to be safe
        else:
            result[key] = str(value) if value is not None else None
    
    return result

def _preprocess_uc_records(records: List[Dict[str, Any]], table_type: str) -> List[Dict[str, Any]]:
    """Preprocess records to prevent excessive column explosion."""
    if table_type == 'databricks_table':
        # For UC tables, aggressively convert ALL nested structures to JSON
        # Only keep simple scalar fields as columns
        return [_convert_all_complex_to_json(r) for r in records]
    elif table_type == 'databricks_schema':
        # For UC schemas, also aggressive conversion
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'catalog_name', 'schema_name', 'full_name', 'owner', 
            'created_at', 'updated_at', 'comment', '_catalog', '_schema'
        ]) for r in records]
    elif table_type == 'databricks_cluster':
        # For clusters, keep core scalar fields and convert complex nested fields to JSON
        # This prevents column explosion from executors array, spark_conf, tags, etc.
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            # Core identifiers
            'cluster_id', 'cluster_name', 'cluster_source',
            # State and timing
            'state', 'state_message', 'start_time', 'terminated_time', 
            'last_state_loss_time', 'last_activity_time', 'last_restarted_time',
            # Configuration
            'spark_version', 'node_type_id', 'driver_node_type_id',
            'num_workers', 'autoscale_min_workers', 'autoscale_max_workers',
            'autotermination_minutes', 'enable_elastic_disk', 'enable_local_disk_encryption',
            # User/ownership
            'creator_user_name', 'single_user_name',
            # Metadata
            'cluster_memory_mb', 'cluster_cores', 
            'policy_id', 'data_security_mode', 'runtime_engine',
            # Injected fields
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_job':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'job_id', 'creator_user_name', 'created_time', 'run_as_user_name',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_pipeline':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'pipeline_id', 'name', 'creator_user_name', 'state', 'health',
            'latest_updates_state', 'latest_updates_creation_time',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_model_serving':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'id', 'name', 'creator', 'creation_timestamp', 'last_updated_timestamp',
            'state_config_served_models', 'state_ready',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_instance_pool':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'instance_pool_id', 'instance_pool_name', 'node_type_id',
            'min_idle_instances', 'max_capacity', 'idle_instance_autotermination_minutes',
            'state', '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_registered_model':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'name', 'creation_timestamp', 'last_updated_timestamp', 'user_id',
            'id', 'description', '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_catalog':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'name', 'catalog_type', 'owner', 'comment', 'metastore_id',
            'created_at', 'created_by', 'updated_at', 'updated_by',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_connection':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'name', 'connection_type', 'owner', 'comment', 'metastore_id',
            'created_at', 'created_by', 'updated_at', 'updated_by',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_recipient':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'name', 'owner', 'comment', 'metastore_id',
            'created_at', 'created_by', 'updated_at', 'updated_by',
            'authentication_type', 'data_recipient_global_metastore_id',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_cluster_policy':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'policy_id', 'name', 'creator_user_name', 'created_at_timestamp',
            'description', 'is_default', 'max_clusters_per_user',
            '_collected_at', '_workspace'
        ]) for r in records]
    elif table_type == 'databricks_experiment':
        return [_convert_all_complex_to_json(r, keep_scalar_fields=[
            'experiment_id', 'name', 'artifact_location', 'lifecycle_stage',
            'creation_time', 'last_update_time',
            '_collected_at', '_workspace'
        ]) for r in records]
    else:
        return records

def normalize_records(records: List[Dict[str, Any]]) -> pd.DataFrame:
    """Flatten JSON rows to a uniform pandas DataFrame with unique/safe columns."""
    if not records:
        return pd.DataFrame([])
    flat_rows = [_flatten_obj(r) for r in records]
    df = pd.DataFrame(flat_rows)
    df.columns = _dedup_columns(list(df.columns))
    return df

def to_spark_df(records: List[Dict[str, Any]], spark, table_type: str = None):
    """Use pandas→spark to avoid sparkContext usage on shared clusters."""
    if not records:
        # Return a more explicit empty DataFrame with proper schema
        return spark.createDataFrame([], "struct<_empty:string>")
    
    # Preprocess UC records if table_type is specified
    if table_type:
        records = _preprocess_uc_records(records, table_type)
    
    pdf = normalize_records(records)
    if pdf.empty:
        # If normalization resulted in empty DataFrame, create with basic schema
        return spark.createDataFrame([], "struct<_empty:string>")
    else:
        return spark.createDataFrame(pdf)

In [0]:
def ensure_uc_sink(catalog: str, schema: str, spark):
    """Ensure catalog and schema exist in Unity Catalog."""
    spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog}`")
    spark.sql(f"CREATE SCHEMA  IF NOT EXISTS `{catalog}`.`{schema}`")

In [0]:
def write_single_raw_table(
    key: str,
    records: List[Dict[str, Any]],
    catalog: str,
    schema: str,
    workspace_url: str,
    start_ts: str,
    spark,
    mode: str = None
):
    """Write a single raw data table to Unity Catalog with robust schema handling.
    
    Args:
        mode: Write mode ('append' or 'overwrite'). If None, uses WRITE_RAW_MODE from config.
    """
    tbl = f"`{catalog}`.`{schema}`.`raw_{key}`"
    
    # Use explicit mode if provided, otherwise use global config
    write_mode = mode if mode is not None else WRITE_RAW_MODE
    
    # Check for empty datasets
    if not records:
        if SKIP_EMPTY_DATASETS:
            print(f"[SKIP] raw_{key} → {tbl} (empty dataset)")
            return
        else:
            # Create a minimal empty DataFrame to avoid schema inference errors
            empty_df = spark.createDataFrame([], "struct<_empty:string>")
            empty_df = (empty_df
                       .withColumn("_collected_at", F.lit(start_ts))
                       .withColumn("_workspace", F.lit(workspace_url)))
            try:
                empty_df.write.mode(write_mode).saveAsTable(tbl)
                print(f"[WRITE] raw_{key} → {tbl} (empty table created)")
                return
            except Exception as e:
                print(f"[WRITE-FAIL] raw_{key}: Failed to create empty table: {e}")
                return
    
    try:
        # Create Spark DataFrame (with preprocessing for UC tables/schemas)
        sdf = (to_spark_df(records, spark, table_type=key)
               .withColumn("_collected_at", F.lit(start_ts))
               .withColumn("_workspace", F.lit(workspace_url)))
        
        # Configure write options based on schema handling settings
        writer = sdf.write.mode(write_mode)
        
        # Add Delta Lake schema evolution options
        if ENABLE_MERGE_SCHEMA:
            writer = writer.option("mergeSchema", "true")
        
        if ENABLE_OVERWRITE_SCHEMA:
            writer = writer.option("overwriteSchema", "true")
        
        # First attempt: try with configured options
        try:
            writer.saveAsTable(tbl)
            print(f"[WRITE] raw_{key} → {tbl} ({len(records)} rows)")
            return
            
        except Exception as e1:
            error_msg = str(e1).lower()
            
            # Handle specific schema mismatch errors
            if any(phrase in error_msg for phrase in [
                "schema mismatch", "failed to merge", "_legacy_error_temp_delta",
                "cannot_infer_empty_schema", "delta_failed_to_merge_fields"
            ]):
                if VERBOSE_SCHEMA_ERRORS:
                    print(f"[SCHEMA-ISSUE] raw_{key}: Schema evolution error detected")
                    print(f"  Error: {error_msg[:200]}...")
                
                # Try fallback strategies
                if FALLBACK_TO_OVERWRITE and not ENABLE_OVERWRITE_SCHEMA:
                    try:
                        print(f"[RETRY] raw_{key} → Attempting with overwriteSchema=true")
                        fallback_writer = sdf.write.mode(write_mode).option("overwriteSchema", "true")
                        fallback_writer.saveAsTable(tbl)
                        print(f"[WRITE] raw_{key} → {tbl} ({len(records)} rows) [schema overwritten]")
                        return
                    except Exception as e2:
                        if VERBOSE_SCHEMA_ERRORS:
                            print(f"[SCHEMA-FAIL] raw_{key}: Overwrite schema also failed: {str(e2)[:200]}...")
                
                # If schema operations fail and we're in overwrite mode, try dropping the table first
                if write_mode == "overwrite":
                    try:
                        print(f"[RETRY] raw_{key} → Attempting to drop and recreate table")
                        spark.sql(f"DROP TABLE IF EXISTS {tbl}")
                        sdf.write.mode("overwrite").saveAsTable(tbl)
                        print(f"[WRITE] raw_{key} → {tbl} ({len(records)} rows) [table recreated]")
                        return
                    except Exception as e3:
                        print(f"[WRITE-FAIL] raw_{key}: All retry attempts failed: {str(e3)[:200]}...")
                        return
            
            # Re-raise the original exception if it's not a schema issue
            raise e1
            
    except Exception as e:
        print(f"[WRITE-FAIL] raw_{key}: {e}")
        if VERBOSE_SCHEMA_ERRORS:
            import traceback
            print(f"[ERROR-DETAIL] raw_{key}:")
            print("  " + "\n  ".join(traceback.format_exc().split("\n")[-10:]))


In [0]:
def write_raw_tables(
    raw_dict: Dict[str, List[Dict[str, Any]]],
    catalog: str,
    schema: str,
    workspace_url: str,
    start_ts: str,
    spark
):
    """Write raw data tables to Unity Catalog."""
    banner("2/4 Write RAW to UC")
    for key, records in raw_dict.items():
        write_single_raw_table(key, records, catalog, schema, workspace_url, start_ts, spark)

In [0]:
def build_and_write_summary(
    counts: Dict[str, int],
    catalog: str,
    schema: str,
    spark
):
    """Build and write the summary table with categorized counts."""
    banner("4/4 Write Summary")
    
    # UC pieces get human-friendly names
    mapping_fixed = {
        "uc_catalogs": ("Metastore", "UC Catalogs"),
        "uc_schemas": ("Metastore", "UC Schemas"),
        "uc_tables": ("Metastore", "UC Tables"),
        "managed_tables": ("Metastore", "UC Tables (Managed)"),
        "external_tables": ("Metastore", "UC Tables (External)"),
        "dbfs_mount_points": ("DBFS/Mounts", "Mount Points"),
    }
    
    # Auto-map for API endpoints
    mapping_auto = {}
    for k in API_ENDPOINTS.keys():
        disp = k.replace("databricks_", "").replace("_", " ").title()
        mapping_auto[k] = ("Auto", disp)

    rows = []
    for k, v in counts.items():
        cat, obj = mapping_fixed.get(k, mapping_auto.get(k, ("Other", k)))
        rows.append({
            "Category": cat,
            "Object": obj,
            "Count": int(v),
            "To_be_Migrated": "Y" if cat not in ["Workspace", "MLflow"] else "N"
        })

    pdf = pd.DataFrame(rows)
    sdf = spark.createDataFrame(pdf)
    tbl = f"`{catalog}`.`{schema}`.`workspace_scan_summary`"
    
    # Use robust writing for summary table too
    try:
        writer = sdf.write.mode(WRITE_SUMMARY_MODE)
        if ENABLE_MERGE_SCHEMA:
            writer = writer.option("mergeSchema", "true")
        writer.saveAsTable(tbl)
        print(f"[WRITE] summary → {tbl}")
    except Exception as e:
        error_msg = str(e).lower()
        if "schema" in error_msg and FALLBACK_TO_OVERWRITE:
            try:
                print(f"[RETRY] summary → Attempting with overwriteSchema=true")
                sdf.write.mode(WRITE_SUMMARY_MODE).option("overwriteSchema", "true").saveAsTable(tbl)
                print(f"[WRITE] summary → {tbl} [schema overwritten]")
            except Exception as e2:
                print(f"[WRITE-FAIL] summary: {e2}")
                raise e2
        else:
            print(f"[WRITE-FAIL] summary: {e}")
            raise e
    
    return sdf

In [None]:
def delete_existing_tables(
    spark, 
    catalog_name: str, 
    schema_name: str,
    table_patterns: List[str] = None,
    exclude_patterns: List[str] = None
):
    """Delete assessment tables from catalog/schema with flexible pattern matching."""
    
    # Default patterns if none provided
    if table_patterns is None:
        table_patterns = ["raw_databricks_", "workspace_scan_summary"]
    
    if exclude_patterns is None:
        exclude_patterns = []
    
    # Get all tables in the catalog and schema
    try:
        tables_df = spark.sql(f"SHOW TABLES IN `{catalog_name}`.`{schema_name}`")
        tables = [row.tableName for row in tables_df.collect()]
    except Exception as e:
        print(f"[ERROR] Failed to list tables in {catalog_name}.{schema_name}: {e}")
        return
    
    print(f"[DELETE] Found {len(tables)} tables in {catalog_name}.{schema_name}")
    
    # Filter tables based on patterns
    tables_to_delete = []
    for t in tables:
        matches_include = any(pattern in t for pattern in table_patterns)
        matches_exclude = any(pattern in t for pattern in exclude_patterns)
        
        if matches_include and not matches_exclude:
            tables_to_delete.append(t)
    
    if not tables_to_delete:
        print(f"[DELETE] No tables matched the specified patterns")
        return
    
    # Delete tables
    print(f"[DELETE] Deleting {len(tables_to_delete)} tables...")
    deleted_count = 0
    failed_count = 0
    
    for t in tables_to_delete:
        fqtn = f"`{catalog_name}`.`{schema_name}`.`{t}`"
        try:
            spark.sql(f"DROP TABLE IF EXISTS {fqtn}")
            print(f"  ✓ Dropped {fqtn}")
            deleted_count += 1
        except Exception as e:
            print(f"  ✗ Failed to drop {fqtn}: {e}")
            failed_count += 1
    
    print(f"[DELETE] ✅ Deleted {deleted_count} tables")
    if failed_count > 0:
        print(f"[DELETE] ❌ Failed to delete {failed_count} tables")

In [0]:
class DataProcessor:
    """Main data processing class for the workspace assessment."""
    
    def __init__(self, spark, workspace_url: str, start_ts: str, catalog: str = None, schema: str = None):
        self.spark = spark
        self.workspace_url = workspace_url
        self.start_ts = start_ts
        self.catalog = catalog
        self.schema = schema
    
    def write_single_raw_table(self, key: str, records: List[Dict[str, Any]], mode: str = None):
        """Write a single raw table immediately.
        
        Args:
            mode: Write mode ('append' or 'overwrite'). If None, uses WRITE_RAW_MODE from config.
        """
        if not self.catalog or not self.schema:
            raise ValueError("Catalog and schema must be set for streaming writes")
        
        write_single_raw_table(
            key, records, self.catalog, self.schema, 
            self.workspace_url, self.start_ts, self.spark, mode
        )
    
    def process_and_write_all(
        self,
        raw_data: Dict[str, List[Dict[str, Any]]],
        uc_counts: Dict[str, int],
        catalog: str,
        schema: str
    ):
        """Process and write all data (raw tables + summary)."""
        # Ensure UC destination exists
        # ensure_uc_sink(catalog, schema, self.spark) # Predefine Your Catalog and Schema Before Hand
        
        # Write raw tables
        write_raw_tables(raw_data, catalog, schema, self.workspace_url, self.start_ts, self.spark)
        
        # Combine counts and write summary
        all_counts = {key: len(records) for key, records in raw_data.items()}
        all_counts.update(uc_counts)
        
        summary_df = build_and_write_summary(all_counts, catalog, schema, self.spark)
        return summary_df