# Databricks Table Quality Assessment using Native PySpark

## Overview  
This notebook performs automated table quality assessment at scale using native PySpark capabilities on Databricks. It leverages distributed compute for terabyte-scale data analysis while avoiding raw data exposure.  

### Key Features:  
- **Metadata Assessment**: Existence checks, size metrics, partition analysis  
- **Schema Inspection**: Data type validation and change history tracking  
- **Data Integrity**: Column-level checks for illegal characters, null ratios, and length validation  
- **Scalable Design**: DataFrame transformations for distributed processing  
- **Security**: No raw data exposure, metadata-only reporting  

### Environment Requirements:  
- Databricks Runtime 13.3+ with PySpark 3.x  
- Delta Lake and Parquet table support  
- Cluster with sufficient resources for parallel processing  

**Author**: Jhonathan Pauca Joya  
**Rol**: MLOPS Eng.  
**University**: Universidad Nacional Mayor de San Marcos  
**Program**: Maestría  
**Last Updated**: August 2025  
**Version**: 1.0  

In [0]:
# Databricks notebook configuration and input parameters
# Add widgets for user input parameters


# Create widgets for interactive parameter configuration
dbutils.widgets.text("table_list", "catalog1.schema1.table1,catalog2.schema2.table2", "Comma-separated list of fully qualified table names")
dbutils.widgets.text("scan_partitions", "10", "Number of recent partitions to scan")
dbutils.widgets.dropdown("output_format", "table", ["table", "json", "csv", "delta"], "Output format for results")
dbutils.widgets.text("size_threshold_gb", "50", "Table size threshold in GB for recommendations")


# Retrieve widget values
table_list_str = dbutils.widgets.get("table_list")
scan_partitions = int(dbutils.widgets.get("scan_partitions"))
output_format = dbutils.widgets.get("output_format")
size_threshold_gb = float(dbutils.widgets.get("size_threshold_gb"))


# Parse table list from comma-separated string
table_list = [table.strip() for table in table_list_str.split(",") if table.strip()]


print(f"Configuration loaded:")
print(f"- Tables to assess: {len(table_list)}")
print(f"- Partition scan depth: {scan_partitions}")
print(f"- Output format: {output_format}")
print(f"- Size threshold: {size_threshold_gb} GB")

In [0]:
# Import required libraries and initialize Spark session
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, length, trim, max as spark_max, min as spark_min, count, 
    when, isnan, isnull, regexp_extract, regexp_replace, sum as spark_sum,
    avg, stddev, first, last, collect_list, concat_ws, lit, size, from_json,
    current_timestamp, date_format, udf, lower
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, ArrayType, MapType
from pyspark.sql.utils import AnalysisException
import re
import json
from decimal import Decimal
from typing import List, Dict, Any, Iterator
from datetime import datetime
import logging

# Initialize Spark session with optimized configuration for quality assessment
spark = SparkSession.builder \
    .appName("TableQualityAssessment") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()


print(f"Spark session initialized: {spark.version}")

In [0]:
# Utility Functions for Table Metadata Assessment

def decimal_default(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")

def table_exists(full_name: str) -> bool:
    """
    Check if a table exists in the catalog.
    
    Args:
        full_name: Fully qualified table name (catalog.schema.table)
    
    Returns:
        Boolean indicating table existence
    """
    try:
        spark.sql(f"DESCRIBE TABLE {full_name}").take(1)
        return True
    except AnalysisException as e:
        if "Table or view not found" in str(e) or "does not exist" in str(e):
            return False
        raise e

def get_table_size_gb(full_name: str) -> float:
    """
    Get approximate table size in GB using DESCRIBE DETAIL.
    
    Args:
        full_name: Fully qualified table name
    
    Returns:
        Table size in GB
    """
    try:
        detail_df = spark.sql(f"DESCRIBE DETAIL {full_name}")
        size_bytes = detail_df.select("sizeInBytes").first()[0]
        return round(size_bytes / (1024**3), 3) if size_bytes else 0.0
    except Exception as e:
        print(f"Warning: Could not get size for {full_name}: {str(e)}")
        return 0.0

def get_partition_columns(full_name: str) -> List[str]:
    """
    For a Delta table, uses DESCRIBE DETAIL to retrieve the
    array of partition columns directly.
    """
    spark = SparkSession.builder.getOrCreate()
    
    # DESCRIBE DETAIL returns a "partitionColumns" ArrayType column
    detail_df = spark.sql(f"DESCRIBE DETAIL {full_name}")
    detail = detail_df.select("partitionColumns").first()
    
    # in case this isn't a Delta table or the field is missing:
    return detail["partitionColumns"] or []

# Removed get_table_size_bypartition_gb method due to invalid SQL syntax
# Will calculate partition size info using mean approach

def get_table_schema_info(full_name: str) -> Dict[str, Any]:
    """
    Extract comprehensive schema information.
    
    Args:
        full_name: Fully qualified table name
    
    Returns:
        Dictionary with schema details
    """
    try:
        # Get basic schema information
        df = spark.table(full_name)
        schema = df.schema
        
        schema_info = {
            "column_count": len(schema.fields),
            "columns": [],
            "data_types": {}
        }
        
        for field in schema.fields:
            col_info = {
                "name": field.name,
                "type": str(field.dataType),
                "nullable": field.nullable
            }
            schema_info["columns"].append(col_info)
            
            # Group by data type
            data_type = str(field.dataType)
            if data_type not in schema_info["data_types"]:
                schema_info["data_types"][data_type] = 0
            schema_info["data_types"][data_type] += 1
            
        return schema_info
    except Exception as e:
        print(f"Warning: Could not get schema for {full_name}: {str(e)}")
        return {"column_count": 0, "columns": [], "data_types": {}}

def get_table_schema_full(full_name: str) -> Dict[str, Any]:
    """
    Extract SQL schema info using DESCRIBE TABLE, including partition columns.
    Returns a dict with 'columns' and 'partition_columns' (both lists of dicts with col_name, data_type, comment).
    """
    try:
        desc_df = spark.sql(f"DESCRIBE TABLE {full_name}")
        rows = desc_df.collect()
        columns = []
        partition_columns = []
        in_partitions = False
        for row in rows:
            col_name = row['col_name']
            data_type = row['data_type']
            comment = row['comment']
            if col_name is None:
                continue
            if col_name.strip().lower() == '# partition information':
                in_partitions = True
                continue
            if col_name.strip().lower() == '# col_name':
                continue
            if col_name.strip().startswith('#'):
                continue
            entry = {'col_name': col_name, 'data_type': data_type, 'comment': comment}
            if in_partitions:
                partition_columns.append(entry)
            else:
                columns.append(entry)
        return {'columns': columns, 'partition_columns': partition_columns}
    except Exception as e:
        print(f"Warning: Could not get SQL schema for {full_name}: {str(e)}")
        return {'columns': [], 'partition_columns': []}

def get_table_history_summary(full_name: str, max_records: int = 1000) -> Dict[str, Any]:
    """
    Get table history summary using DESCRIBE HISTORY.
    
    Args:
        full_name: Fully qualified table name
        max_records: Maximum number of history records to analyze
    
    Returns:
        Dictionary with history summary
    """
    try:
        history_df = spark.sql(f"DESCRIBE HISTORY {full_name} LIMIT {max_records}")
        
        history_summary = {
            "total_operations": 0,
            "operations": {},
            "last_modified": None,
            "created_by": None
        }
        
        if history_df.count() > 0:
            history_data = history_df.collect()
            history_summary["total_operations"] = len(history_data)
            
            # Count operations by type
            for row in history_data:
                operation = row["operation"] if row["operation"] else "UNKNOWN"
                if operation not in history_summary["operations"]:
                    history_summary["operations"][operation] = 0
                history_summary["operations"][operation] += 1
            
            # Get latest modification info
            latest = history_data[0]
            history_summary["last_modified"] = str(latest["timestamp"]) if latest["timestamp"] else None
            history_summary["created_by"] = latest["userName"] if latest["userName"] else "Unknown"
        
        return history_summary
    except Exception as e:
        print(f"Warning: Could not get history for {full_name}: {str(e)}")
        return {"total_operations": 0, "operations": {}, "last_modified": None, "created_by": None}

print("Utility functions for table metadata assessment loaded successfully")

In [0]:
# Column-Level Integrity Check Functions


def analyze_column_quality(df: DataFrame, column_name: str, data_type: str, total_rows: int) -> Dict[str, Any]:
    """
    Perform comprehensive column quality analysis.
    
    Args:
        df: DataFrame for the table (filtered if needed)
        column_name: Column to analyze
        data_type: Data type of the column
        total_rows: Total number of rows to use for metrics (precomputed, limited by scan_partitions)
    
    Returns:
        Dictionary with column quality metrics
    """
    try:
        if total_rows == 0:
            return {
                "column_name": column_name,
                "data_type": data_type,
                "total_rows": 0,
                "null_count": 0,
                "null_ratio": 0.0,
                "issues": []
            }
        
        column_stats = {
            "column_name": column_name,
            "data_type": data_type,
            "total_rows": total_rows,
            "null_count": 0,
            "null_ratio": 0.0,
            "issues": []
        }
        
        # Calculate null statistics
        null_count = df.filter(col(column_name).isNull()).count()
        column_stats["null_count"] = null_count
        column_stats["null_ratio"] = round(null_count / total_rows, 4)
        
        # String-specific checks
        if "string" in data_type.lower():
            string_analysis = analyze_string_column(df, column_name, total_rows)
            column_stats.update(string_analysis)
        
        # Numeric-specific checks
        elif any(num_type in data_type.lower() for num_type in ["int", "long", "float", "double", "decimal"]):
            numeric_analysis = analyze_numeric_column(df, column_name)
            column_stats.update(numeric_analysis)
        
        return column_stats
        
    except Exception as e:
        return {
            "column_name": column_name,
            "data_type": data_type,
            "error": str(e),
            "issues": [f"Analysis failed: {str(e)}"]
        }


def analyze_string_column(df: DataFrame, column_name: str, total_rows: int) -> Dict[str, Any]:
    """
    Analyze string column for quality issues, including per-character illegal counts.
    Args:
        df: DataFrame containing the column
        column_name: Name of string column to analyze
        total_rows: Total number of rows for ratio calculations
    Returns:
        Dictionary with string-specific quality metrics, including per-character illegal counts
    """
    string_stats = {}
    
    # Patterns for each illegal character type
    illegal_patterns = {
        'commas': r',' ,
        'semicolons': r';' ,
        'pipes': r'\|',
        'tabs': r'\t',
        'newlines': r'(\n|\r)'
    }
    
    # Count for each illegal character type
    for char_name, pattern in illegal_patterns.items():
        count = df.filter(col(column_name).rlike(pattern)).count()
        string_stats[char_name] = count
    
    # Total illegal chars (union, not sum, for ratio)
    illegal_chars_pattern = r"[,;|\t\n\r]"
    illegal_chars_count = df.filter(col(column_name).rlike(illegal_chars_pattern)).count()
    string_stats["illegal_chars_count"] = illegal_chars_count
    string_stats["illegal_chars_ratio"] = round(illegal_chars_count / total_rows, 4)
    
    # Check for leading/trailing whitespace
    whitespace_issues_count = df.filter(
        (length(col(column_name)) != length(trim(col(column_name)))) &
        col(column_name).isNotNull()
    ).count()
    string_stats["whitespace_issues_count"] = whitespace_issues_count
    string_stats["whitespace_issues_ratio"] = round(whitespace_issues_count / total_rows, 4)
    
    # Calculate max length
    max_length_result = df.agg(spark_max(length(col(column_name)))).first()
    string_stats["max_length"] = max_length_result[0] if max_length_result[0] else 0
    
    # Calculate average length (excluding nulls)
    avg_length_result = df.filter(col(column_name).isNotNull()).agg(avg(length(col(column_name)))).first()
    string_stats["avg_length"] = round(avg_length_result[0], 2) if avg_length_result[0] else 0.0
    
    # Check for empty strings
    empty_strings_count = df.filter(
        (col(column_name) == "") | (col(column_name) == " ")
    ).count()
    string_stats["empty_strings_count"] = empty_strings_count
    string_stats["empty_strings_ratio"] = round(empty_strings_count / total_rows, 4)
    
    # Check for 'NULL' string values (case-insensitive)
    null_string_count = df.filter(
        col(column_name).isNotNull() & (lower(trim(col(column_name))) == "null")
    ).count()
    string_stats["null_string_count"] = null_string_count
    string_stats["null_string_ratio"] = round(null_string_count / total_rows, 4)
    
    # Compile issues
    issues = []
    if string_stats["illegal_chars_ratio"] > 0:
        issues.append(f"High illegal characters ratio: {string_stats['illegal_chars_ratio']:.2%}")
    if string_stats["whitespace_issues_ratio"] > 0:
        issues.append(f"High whitespace issues ratio: {string_stats['whitespace_issues_ratio']:.2%}")
    if string_stats["max_length"] > 256:
        issues.append(f"Very long strings detected: max length {string_stats['max_length']}")
    if string_stats["empty_strings_ratio"] > 0:
        issues.append(f"High empty strings ratio: {string_stats['empty_strings_ratio']:.2%}")
    if string_stats["null_string_ratio"] > 0:
        issues.append(f"High 'NULL' string ratio: {string_stats['null_string_ratio']:.2%}")
    
    string_stats["issues"] = issues
    return string_stats


def analyze_numeric_column(df: DataFrame, column_name: str) -> Dict[str, Any]:
    """
    Analyze numeric column for quality issues.
    
    Args:
        df: DataFrame containing the column
        column_name: Name of numeric column to analyze
    
    Returns:
        Dictionary with numeric-specific quality metrics
    """
    numeric_stats = {}
    
    # Basic numeric statistics
    stats_result = df.filter(col(column_name).isNotNull()).agg(
        spark_min(col(column_name)).alias("min_value"),
        spark_max(col(column_name)).alias("max_value"),
        avg(col(column_name)).alias("avg_value"),
        stddev(col(column_name)).alias("stddev_value")
    ).first()
    
    if stats_result:
        numeric_stats["min_value"] = stats_result["min_value"]
        numeric_stats["max_value"] = stats_result["max_value"]
        numeric_stats["avg_value"] = round(stats_result["avg_value"], 4) if stats_result["avg_value"] else None
        numeric_stats["stddev_value"] = round(stats_result["stddev_value"], 4) if stats_result["stddev_value"] else None
    
    # Check for infinite values
    infinite_count = df.filter(
        isnan(col(column_name)) | 
        (col(column_name) == float('inf')) | 
        (col(column_name) == float('-inf'))
    ).count()
    
    numeric_stats["infinite_count"] = infinite_count
    
    # Compile issues
    issues = []
    if infinite_count > 0:
        issues.append(f"Infinite/NaN values detected: {infinite_count}")
    
    numeric_stats["issues"] = issues
    return numeric_stats


print("Column-level integrity check functions loaded successfully")

In [0]:
# Main Table Assessment Orchestration

def assess_table_quality(table_name: str, scan_partitions: int = 10) -> Dict[str, Any]:
    """
    Perform comprehensive table quality assessment.
    
    Args:
        table_name: Fully qualified table name
        scan_partitions: Number of recent partitions to analyze (limits partition count)
    
    Returns:
        Complete assessment results dictionary
    """
    assessment_start = datetime.now()
    
    result = {
        "table_name": table_name,
        "assessment_timestamp": assessment_start.isoformat(),
        "exists": False,
        "metadata": {},
        "column_analysis": [],
        "partition_size_info": [],
        "recommendations": [],
        "assessment_duration_seconds": 0
    }
    
    try:
        # Step 1: Check table existence
        if not table_exists(table_name):
            result["recommendations"].append("CRITICAL: Table does not exist")
            return result
        
        result["exists"] = True
        
        # Step 2: Gather metadata
        print(f"Analyzing metadata for {table_name}...")
        metadata = {
            "size_gb": get_table_size_gb(table_name),
            "partition_columns": get_partition_columns(table_name),
            "schema_info": get_table_schema_info(table_name),
            "schema_full": get_table_schema_full(table_name),
            "history_summary": get_table_history_summary(table_name)
        }
        result["metadata"] = metadata
        
        # Step 3: Prepare DataFrame and filter by partitions if needed
        df = spark.table(table_name)
        partition_cols = metadata["partition_columns"]
        if not partition_cols or len(partition_cols) == 0:
            # Not partitioned: use all rows
            df_filtered = df
            total_rows = df_filtered.count()
            print(f"Table is not partitioned. Using all {total_rows} rows for column analysis.")
            # No partition size info for unpartitioned tables
            result["partition_size_info"] = []
        else:
            # Partitioned: get last scan_partitions partition values from metadata
            part_col = partition_cols[0]  # Only support single partition column for now
            # Get distinct partition values, order descending, take N
            part_values = [row[0] for row in df.select(part_col).distinct().orderBy(col(part_col).desc()).limit(scan_partitions).collect()]
            df_filtered = df.filter(col(part_col).isin(part_values))
            total_rows = df_filtered.count()
            print(f"Table is partitioned by {part_col}. Using last {len(part_values)} partitions ({total_rows} rows) for column analysis.")
            
            # Calculate mean partition size using total table size and scan_partitions
            total_size_gb = metadata["size_gb"]
            if scan_partitions > 0 and total_size_gb > 0:
                mean_partition_size_gb = total_size_gb / scan_partitions
                partition_size_info = [{
                    "partition_col": part_col,
                    "mean_partition_size_gb": round(mean_partition_size_gb, 6),
                    "scan_partitions": scan_partitions,
                    "total_size_gb": total_size_gb
                }]
            else:
                partition_size_info = []
            result["partition_size_info"] = partition_size_info
        
        # Step 4: Analyze each column
        print(f"Performing column-level analysis for {table_name}...")
        schema_info = metadata["schema_info"]
        column_analyses = []
        
        for column_info in schema_info["columns"]:
            col_name = column_info["name"]
            col_type = column_info["type"]
            
            # Analyze column quality (pass df_filtered and total_rows)
            col_analysis = analyze_column_quality(df_filtered, col_name, col_type, total_rows)
            column_analyses.append(col_analysis)
        
        result["column_analysis"] = column_analyses
        
        # Step 5: Generate recommendations
        recommendations = generate_recommendations(result, size_threshold_gb)
        result["recommendations"] = recommendations
        
        # Calculate assessment duration
        assessment_end = datetime.now()
        result["assessment_duration_seconds"] = (assessment_end - assessment_start).total_seconds()
        
        print(f"Assessment completed for {table_name} in {result['assessment_duration_seconds']:.2f} seconds")
        
    except Exception as e:
        result["error"] = str(e)
        result["recommendations"].append(f"CRITICAL: Assessment failed - {str(e)}")
        print(f"Assessment failed for {table_name}: {str(e)}")
    
    return result

def generate_recommendations(assessment: Dict[str, Any], size_threshold: float) -> List[str]:
    """
    Generate actionable recommendations based on assessment results.
    
    Args:
        assessment: Complete assessment results
        size_threshold: Size threshold in GB for recommendations
    
    Returns:
        List of recommendation strings
    """
    recommendations = []
    metadata = assessment.get("metadata", {})
    column_analysis = assessment.get("column_analysis", [])
    
    # Size-based recommendations
    size_gb = metadata.get("size_gb", 0)
    if size_gb > size_threshold:
        recommendations.append(f"PERFORMANCE: Large table ({size_gb:.1f} GB) - consider partitioning optimization")
    elif size_gb > size_threshold * 2:
        recommendations.append(f"CRITICAL: Very large table ({size_gb:.1f} GB) - requires cluster scaling")
    
    # Partition recommendations
    partition_cols = metadata.get("partition_columns", [])
    if len(partition_cols) == 0 and size_gb > 10:
        recommendations.append("OPTIMIZATION: Table > 10GB without partitioning - consider adding partitions")
    elif len(partition_cols) > 5:
        recommendations.append("WARNING: Too many partition columns may cause small files issue")
    
    # Per-partition size recommendations using mean partition size
    partition_size_info = assessment.get("partition_size_info", [])
    if partition_size_info:
        for part_info in partition_size_info:
            mean_size = part_info.get("mean_partition_size_gb", 0)
            if mean_size > size_threshold:
                recommendations.append(f"PERFORMANCE: Mean partition size ({mean_size:.2f} GB) exceeds threshold - consider repartitioning strategy")
    
    # Column quality recommendations
    critical_issues = 0
    for col_analysis in column_analysis:
        issues = col_analysis.get("issues", [])
        critical_issues += len(issues)
        
        # High null ratio warning
        null_ratio = col_analysis.get("null_ratio", 0)
        if null_ratio > 0.5:
            recommendations.append(f"DATA QUALITY: Column '{col_analysis['column_name']}' has high null ratio ({null_ratio:.1%})")
        
        # String quality issues
        if col_analysis.get("illegal_chars_ratio", 0) > 0:
            recommendations.append(f"DATA QUALITY: Column '{col_analysis['column_name']}' contains illegal characters")
        
        if col_analysis.get("max_length", 0) > 256:
            recommendations.append(f"PERFORMANCE: Column '{col_analysis['column_name']}' has very long strings (max: {col_analysis.get('max_length')})")
    
    # Overall data quality assessment - FIXED: Any issue should mark as critical
    if critical_issues == 0:
        recommendations.append("HEALTHY: No critical data quality issues detected")
    else:
        # ANY issue is considered critical for compliance
        recommendations.append(f"CRITICAL: {critical_issues} data quality issues detected - immediate attention required")
    
    # History-based recommendations
    history = metadata.get("history_summary", {})
    total_ops = history.get("total_operations", 0)
    if total_ops > 100:
        recommendations.append("OPTIMIZATION: Table has extensive modification history - consider table optimization")
    
    return recommendations

print("Main assessment orchestration functions loaded successfully")

In [0]:
# Parallel Processing and Execution Engine

def execute_quality_assessment(table_list: List[str], scan_partitions: int = 10) -> DataFrame:
    """
    Execute quality assessment for multiple tables in parallel using Spark.
    
    Args:
        table_list: List of fully qualified table names
        scan_partitions: Number of partitions to scan per table
    
    Returns:
        Spark DataFrame with comprehensive assessment results
    """
    print(f"Starting quality assessment for {len(table_list)} tables...")
    execution_start = datetime.now()
    
    # Create assessment results list
    assessment_results = []
    
    # Process each table (can be parallelized with Spark RDD if needed)
    for i, table_name in enumerate(table_list, 1):
        print(f"\nProcessing table {i}/{len(table_list)}: {table_name}")
        
        try:
            # Perform assessment
            result = assess_table_quality(table_name, scan_partitions)
            assessment_results.append(result)
            
        except Exception as e:
            print(f"Failed to assess {table_name}: {str(e)}")
            # Add failed result
            failed_result = {
                "table_name": table_name,
                "assessment_timestamp": datetime.now().isoformat(),
                "exists": False,
                "error": str(e),
                "recommendations": [f"CRITICAL: Assessment failed - {str(e)}"]
            }
            assessment_results.append(failed_result)
    
    execution_end = datetime.now()
    total_duration = (execution_end - execution_start).total_seconds()
    
    print(f"\nCompleted assessment of {len(table_list)} tables in {total_duration:.2f} seconds")
    
    # Convert results to Spark DataFrame for further processing
    results_df = create_results_dataframe(assessment_results)
    
    return results_df

def create_results_dataframe(assessment_results: List[Dict[str, Any]]) -> DataFrame:
    """
    Convert assessment results to structured Spark DataFrame.
    
    Args:
        assessment_results: List of assessment result dictionaries
    
    Returns:
        Structured Spark DataFrame with flattened results
    """
    flattened_results = []
    
    for result in assessment_results:
        # Flatten the nested structure for DataFrame compatibility
        columns_with_issues = [col.get("column_name") for col in result.get("column_analysis", []) if col.get("issues", [])]
        issues_dict = {col.get("column_name"): col.get("issues") for col in result.get("column_analysis", []) if col.get("issues", [])}
        flat_result = {
            "table_name": result.get("table_name", ""),
            "assessment_timestamp": result.get("assessment_timestamp", ""),
            "exists": result.get("exists", False),
            "assessment_duration_seconds": result.get("assessment_duration_seconds", 0.0),
            
            # Metadata fields
            "size_gb": result.get("metadata", {}).get("size_gb", 0.0),
            "partition_columns": json.dumps(result.get("metadata", {}).get("partition_columns", [])),
            "column_count": result.get("metadata", {}).get("schema_info", {}).get("column_count", 0),
            "data_types": json.dumps(result.get("metadata", {}).get("schema_full", {})),
            "total_operations": result.get("metadata", {}).get("history_summary", {}).get("total_operations", 0),
            "last_modified": result.get("metadata", {}).get("history_summary", {}).get("last_modified", ""),
            
            # Column analysis summary
            "total_columns_analyzed": len(result.get("column_analysis", [])),
            "columns_with_issues": ", ".join(columns_with_issues),
            "avg_null_ratio": round(
                sum(col.get("null_ratio", 0) for col in result.get("column_analysis", [])) / 
                max(len(result.get("column_analysis", [])), 1), 4
            ),
            "max_string_length": max(
                (col.get("max_length", 0) for col in result.get("column_analysis", []) 
                 if "string" in col.get("data_type", "").lower()), 
                default=0
            ),
            "null_string_count": max((col.get("null_string_count", 0) for col in result.get("column_analysis", []) if "string" in col.get("data_type", "").lower()), default=0),
            "null_string_ratio": max((col.get("null_string_ratio", 0.0) for col in result.get("column_analysis", []) if "string" in col.get("data_type", "").lower()), default=0.0),
            
            # Recommendations and issues
            "total_recommendations": len(result.get("recommendations", [])),
            "has_critical_issues": any("CRITICAL" in rec for rec in result.get("recommendations", [])),
            "recommendations": json.dumps(result.get("recommendations", [])),
            
            # Error handling
            "has_error": "error" in result,
            "error_message": result.get("error", ""),
            
            # Add column_analysis as JSON string
            "column_analysis": json.dumps(result.get("column_analysis", []), default=decimal_default),
            "partition_size_info": json.dumps(result.get("partition_size_info", [])),
            # Add issues as JSON string,
            "issues": json.dumps(issues_dict)
        }
        
        flattened_results.append(flat_result)
    
    # Define schema for the DataFrame
    schema = StructType([
        StructField("table_name", StringType(), True),
        StructField("assessment_timestamp", StringType(), True),
        StructField("exists", BooleanType(), True),
        StructField("assessment_duration_seconds", FloatType(), True),
        StructField("size_gb", FloatType(), True),
        StructField("partition_columns", StringType(), True),
        StructField("column_count", IntegerType(), True),
        StructField("data_types", StringType(), True),
        StructField("total_operations", IntegerType(), True),
        StructField("last_modified", StringType(), True),
        StructField("total_columns_analyzed", IntegerType(), True),
        # StructField("columns_with_issues", IntegerType(), True),
        StructField("columns_with_issues", StringType(), True),
        StructField("avg_null_ratio", FloatType(), True),
        StructField("max_string_length", IntegerType(), True),
        StructField("null_string_count", IntegerType(), True),
        StructField("null_string_ratio", FloatType(), True),
        StructField("total_recommendations", IntegerType(), True),
        StructField("has_critical_issues", BooleanType(), True),
        StructField("recommendations", StringType(), True),
        StructField("has_error", BooleanType(), True),
        StructField("error_message", StringType(), True),
        StructField("column_analysis", StringType(), True),
        StructField("partition_size_info", StringType(), True),
        StructField("issues", StringType(), True)
    ])
    
    # Create DataFrame from results
    results_df = spark.createDataFrame(flattened_results, schema)
    
    return results_df

print("Parallel processing and execution engine loaded successfully")

In [0]:
# Execute Quality Assessment
def databricks_table_quality_assessment()-> DataFrame:
    """
    Main function to execute the Databricks Table Quality Assessment.
    """
    # Parameters are already retrieved from widgets above
    print("="*80)
    print("DATABRICKS TABLE QUALITY ASSESSMENT")
    print("="*80)

    # Validate input parameters
    if not table_list:
        print("ERROR: No tables specified in table_list parameter")
        print("Please configure the table_list widget with comma-separated fully qualified table names")
    else:
        print(f"Configuration Summary:")
        print(f"- Tables to assess: {len(table_list)}")
        print(f"- Table list: {table_list}")
        print(f"- Partition scan depth: {scan_partitions}")
        print(f"- Size threshold: {size_threshold_gb} GB")
        print(f"- Output format: {output_format}")
        
        print("\nStarting comprehensive table quality assessment...")
        
        # Execute the assessment
        results_df = execute_quality_assessment(table_list, scan_partitions)
        
        print(f"\nAssessment completed! Results summary:")
        print(f"- Total tables processed: {results_df.count()}")
        print(f"- Tables existing: {results_df.filter(col('exists') == True).count()}")
        print(f"- Tables with critical issues: {results_df.filter(col('has_critical_issues') == True).count()}")
        print(f"- Tables with errors: {results_df.filter(col('has_error') == True).count()}")
        
        # Show quick summary statistics
        print("\nQuick Statistics:")
        summary_stats = results_df.filter(col('exists') == True).agg(
            spark_sum('size_gb').alias('total_size_gb'),
            avg('size_gb').alias('avg_size_gb'),
            spark_max('size_gb').alias('max_size_gb'),
            avg('column_count').alias('avg_columns'),
            avg('avg_null_ratio').alias('overall_avg_null_ratio')
        ).collect()[0]
        
        if summary_stats:
            print(f"- Total size across all tables: {summary_stats['total_size_gb']:.2f} GB")
            print(f"- Average table size: {summary_stats['avg_size_gb']:.2f} GB")
            print(f"- Largest table size: {summary_stats['max_size_gb']:.2f} GB")
            print(f"- Average columns per table: {summary_stats['avg_columns']:.1f}")
            print(f"- Overall average null ratio: {summary_stats['overall_avg_null_ratio']:.2%}")

        print("\nQuality assessment execution completed successfully!")
        return results_df

In [0]:
# Results Display and Reporting
def detailed_assessment_results(results_df: DataFrame) -> None:
    """
    Display detailed assessment results in a user-friendly format.
    """
    ##############
    # Results Display
    ##############
    


    # Display main results table using Databricks display function
    print("="*80)
    print("DETAILED ASSESSMENT RESULTS")
    print("="*80)


    if results_df and not results_df.isEmpty():


        # Parse partition columns and partition size info as arrays/structs for display
        results_df = results_df.withColumn(
            "partition_cols", from_json(col("partition_columns"), ArrayType(StringType()))
    )


        # Parse partition_size_gb as mean partition size info from partition_size_info

        partition_size_schema = ArrayType(StructType([
            StructField("partition_col", StringType(), True),
            StructField("mean_partition_size_gb", FloatType(), True),
            StructField("scan_partitions", IntegerType(), True),
            StructField("total_size_gb", FloatType(), True)
        ]))


        if "partition_size_info" in results_df.columns:
            results_df = results_df.withColumn(
                "partition_size_gb", from_json(col("partition_size_info"), partition_size_schema)
            )
        else:
            results_df = results_df.withColumn("partition_size_gb", lit(None).cast(partition_size_schema))



        # Parse new data_types (schema_full) as struct with columns and partition_columns
        schema_full_type = StructType([
            StructField("columns", ArrayType(StructType([
                StructField("col_name", StringType(), True),
                StructField("data_type", StringType(), True),
                StructField("comment", StringType(), True)
            ])), True),
            StructField("partition_columns", ArrayType(StructType([
                StructField("col_name", StringType(), True),
                StructField("data_type", StringType(), True),
                StructField("comment", StringType(), True)
            ])), True)
        ])
        results_df = results_df.withColumn(
            "schema_full", from_json(col("data_types"), schema_full_type)
        )

        # --- Enhanced extraction for illegal_chars as struct array per column with character breakdown ---


        def extract_illegal_chars_struct(col_analysis_json):
            try:
                analysis = json.loads(col_analysis_json) if isinstance(col_analysis_json, str) else col_analysis_json
                if not analysis:
                    return []
                result = []
                for col in analysis:
                    if "illegal_chars_count" in col:
                        result.append({
                            "column_name": col.get("column_name"),
                            "commas": col.get("commas", 0),
                            "semicolons": col.get("semicolons", 0),
                            "pipes": col.get("pipes", 0),
                            "tabs": col.get("tabs", 0),
                            "newlines": col.get("newlines", 0)
                        })
                return result
            except Exception:
                return []


        extract_illegal_chars_struct_udf = udf(
            extract_illegal_chars_struct,
            ArrayType(StructType([
                StructField("column_name", StringType(), True),
                StructField("commas", IntegerType(), True),
                StructField("semicolons", IntegerType(), True),
                StructField("pipes", IntegerType(), True),
                StructField("tabs", IntegerType(), True),
                StructField("newlines", IntegerType(), True)
    ])),
        )


        # If column_analysis is not present in results_df, add empty struct array
        if 'column_analysis' not in results_df.columns:
            results_df = results_df.withColumn(
                "illegal_chars", lit([]).cast(ArrayType(StructType([
                    StructField("column_name", StringType(), True),
                    StructField("commas", IntegerType(), True),
                    StructField("semicolons", IntegerType(), True),
                    StructField("pipes", IntegerType(), True),
                    StructField("tabs", IntegerType(), True),
                    StructField("newlines", IntegerType(), True)
    ])))
            )
        else:
            results_df = results_df.withColumn(
                "illegal_chars", extract_illegal_chars_struct_udf(col("column_analysis"))
    )


        # --- Enhanced extraction for max_length for string columns only ---
        def extract_max_length(col_analysis_json):
            try:
                analysis = json.loads(col_analysis_json) if isinstance(col_analysis_json, str) else col_analysis_json
                if not analysis:
                    return []
                # Only return string columns with their max_length
                return [
                    {"column_name": col["column_name"], "max_length": col["max_length"]}
                    for col in analysis
                    if "string" in col.get("data_type", "").lower() and col.get("max_length") is not None
                ]
            except Exception:
                return []

        extract_max_length_udf = udf(
            extract_max_length,
            ArrayType(StructType([
                StructField("column_name", StringType(), True),
                StructField("max_length", IntegerType(), True)
            ]))
        )


        if 'column_analysis' not in results_df.columns:
            results_df = results_df.withColumn(
                "max_length", lit([]).cast(ArrayType(StructType([
                    StructField("column_name", StringType(), True),
                    StructField("max_length", IntegerType(), True)
                ])))
            )
        else:
            results_df = results_df.withColumn("max_length", extract_max_length_udf(col("column_analysis")))

        # Create a user-friendly view for display
        display_df = results_df.select(
            col("table_name").alias("Table Name"),
            col("exists").alias("Exists"),
            col("size_gb").alias("Size (GB)"),
            col("partition_cols").alias("Partition Columns"),
            col("partition_size_gb").alias("Partition Size Info (GB)"),
            col("column_count").alias("Columns"),
            col("schema_full").alias("Schema (Types)") ,
            col("total_columns_analyzed").alias("Analyzed Columns"),
            col("columns_with_issues").alias("Columns with Issues"),
            col("avg_null_ratio").alias("Avg Null Ratio"),
            col("illegal_chars").alias("Illegal Chars (per col)"),
            col("max_length").alias("Max Length (per col)"),
            col("null_string_count").alias("'NULL' String Count"),
            col("null_string_ratio").alias("'NULL' String Ratio"),
            col("has_critical_issues").alias("Critical Issues"),
            col("total_recommendations").alias("Recommendations Count"),
            col("assessment_duration_seconds").alias("Duration (s)")
    ).orderBy(col("Size (GB)").desc())


        print("Main Assessment Results:")
        display(display_df)

        
        # Show tables with critical issues
        critical_tables = results_df.filter(col("has_critical_issues") == True)
        critical_count = critical_tables.count()
        
        if critical_count > 0:
            print(f"\n⚠️  CRITICAL ISSUES DETECTED ({critical_count} tables):")
            critical_display = critical_tables.select(
                col("table_name").alias("Table Name"),
                col("size_gb").alias("Size (GB)"),
                col("recommendations").alias("Recommendations")
            )
            display(critical_display)
        else:
            print("\n✅ No critical issues detected across all tables!")
        
        # Show failed assessments
        failed_tables = results_df.filter(col("has_error") == True)
        failed_count = failed_tables.count()
        
        if failed_count > 0:
            print(f"\n❌ FAILED ASSESSMENTS ({failed_count} tables):")
            failed_display = failed_tables.select(
                col("table_name").alias("Table Name"),
                col("error_message").alias("Error Message")
            )
            display(failed_display)
        
        # Generate executive summary
        print("\n" + "="*80)
        print("EXECUTIVE SUMMARY")
        print("="*80)
        
        total_tables = results_df.count()
        existing_tables = results_df.filter(col("exists") == True).count()
        # tables_with_issues = results_df.filter(col("columns_with_issues") > 0).count()
        tables_with_issues = results_df.filter(col("columns_with_issues") != "").count()
        
        print(f"📊 Assessment Overview:")
        print(f"   • Total tables evaluated: {total_tables}")
        print(f"   • Tables found: {existing_tables}")
        print(f"   • Tables missing: {total_tables - existing_tables}")
        print(f"   • Tables with data quality issues: {tables_with_issues}")
        print(f"   • Tables requiring immediate attention: {critical_count}")
        
        if existing_tables > 0:
            # Calculate health score
            health_score = max(0, 100 - (critical_count / existing_tables * 50) - (tables_with_issues / existing_tables * 30))
            print(f"   • Overall health score: {health_score:.1f}/100")
            
            if health_score >= 90:
                print("   🟢 Status: HEALTHY - Tables are in good condition")
            elif health_score >= 80:
                print("   🟡 Status: FAIR - Some issues detected, monitoring recommended")
            else:
                print("   🔴 Status: CRITICAL - Immediate attention required")


    else:
        print("❌ No results available. Please run the assessment first.")

In [0]:
# Detailed Column Analysis View
def detailed_column_analysis(results_df: DataFrame) -> None:
    """
    Provide detailed column-level analysis and recommendations.
    """
    ##############
    # Detailed Column Analysis
    ##############
    print("="*80)
    print("DETAILED COLUMN ANALYSIS")
    print("="*80)

    if results_df and not results_df.isEmpty():
        
        # Create detailed column analysis by expanding the assessment results
        print("Generating detailed column analysis report...")
        
        # Get tables that exist for detailed analysis
        existing_tables = results_df.filter(col("exists") == True).select("table_name").collect()
        
        if existing_tables:
            print(f"Analyzing {len(existing_tables)} existing tables in detail...")
            
            # For demonstration, show analysis for tables with issues
            tables_with_issues = results_df.filter(
                (col("exists") == True) & 
                (col("columns_with_issues") != "")
            ).select("table_name", "columns_with_issues", "issues", "recommendations")
            
            issues_count = tables_with_issues.count()
            if issues_count > 0:
                print(f"\n📋 Tables requiring attention ({issues_count} tables):")
                
                tables_issues_display = tables_with_issues.select(
                    col("table_name").alias("Table Name"),
                    col("columns_with_issues").alias("Columns with Issues"),
                    col("issues").alias("Issues"),
                    col("recommendations").alias("Recommendations")
                )
                display(tables_issues_display)
                
                # Show data quality distribution
                print("\n📈 Data Quality Distribution:")
                quality_distribution = results_df.filter(col("exists") == True).select(
                    when(col("avg_null_ratio") < 0.05, "Excellent (< 5% nulls)")
                    .when(col("avg_null_ratio") < 0.15, "Good (5-15% nulls)")
                    .when(col("avg_null_ratio") < 0.30, "Fair (15-30% nulls)")
                    .otherwise("Poor (> 30% nulls)").alias("Data Quality Category"),
                    col("table_name")
                ).groupBy("Data Quality Category").count().orderBy("count")
                
                display(quality_distribution)
            else:
                print("\n✅ All existing tables have excellent column quality!")
            
            # Show size distribution for capacity planning
            print("\n💾 Table Size Distribution (for capacity planning):")
            size_distribution = results_df.filter(col("exists") == True).select(
                when(col("size_gb") < 1, "Small (< 1 GB)")
                .when(col("size_gb") < 10, "Medium (1-10 GB)")
                .when(col("size_gb") < 50, "Large (10-50 GB)")
                .when(col("size_gb") < 100, "Very Large (50-100 GB)")
                .otherwise("Massive (> 100 GB)").alias("Size Category"),
                col("table_name"),
                col("size_gb")
            ).groupBy("Size Category").agg(
                count("*").alias("Table Count"),
                spark_sum("size_gb").alias("Total Size (GB)")
            ).orderBy("Total Size (GB)")
            
            display(size_distribution)
            
            # Performance recommendations
            print("\n⚡ Performance Recommendations:")
            large_tables = results_df.filter(
                (col("exists") == True) & 
                (col("size_gb") > size_threshold_gb)
            ).count()
            
            unpartitioned_tables = results_df.filter(
                (col("exists") == True) & 
                (col("partition_columns") == "[]") & 
                (col("size_gb") > 10)
            ).count()
            
            if large_tables > 0:
                print(f"   • {large_tables} tables exceed size threshold ({size_threshold_gb} GB)")
                print(f"   • Consider cluster scaling for optimal performance")
            
            if unpartitioned_tables > 0:
                print(f"   • {unpartitioned_tables} large tables (>10GB) lack partitioning")
                print(f"   • Implement partitioning strategy for better query performance")
            
            if large_tables == 0 and unpartitioned_tables == 0:
                print("   ✅ No performance issues detected!")
            
        else:
            print("❌ No existing tables found for detailed analysis")

    else:
        print("❌ No results available. Please run the assessment first.")

In [0]:
# Export and Persistence
def export_and_persistence(results_df: DataFrame) -> None:
    """
    Export results in various formats and persist for historical tracking.
    """
    ##############
    # Export and Persistence
    ##############

    print("="*80)
    print("EXPORT AND PERSISTENCE")
    print("="*80)


    if results_df and not results_df.isEmpty():
        
        # Generate timestamp for file naming
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # Prepare results for export based on selected format
        if output_format == "table":
            print("📊 Results displayed in table format above")
            
        elif output_format == "json":
            print("📄 Exporting results to JSON format...")
            
            # Convert to JSON-friendly format
            json_results = results_df.toJSON().collect()
            json_output = [json.loads(row) for row in json_results]
            
            # Save to DBFS or display
            json_path = f"/tmp/table_quality_assessment_{timestamp}.json"
            
            # Write to temporary location for download
            with open(f"/databricks/driver/{json_path.split('/')[-1]}", 'w') as f:
                json.dump(json_output, f, indent=2)
            
            print(f"   ✅ JSON export saved to: {json_path}")
            print(f"   📥 File available for download from driver node")
            
        elif output_format == "csv":
            print("📈 Exporting results to CSV format...")
            
            # Simplify for CSV export (flatten complex fields)
            csv_df = results_df.select(
                "table_name",
                "exists", 
                "size_gb",
                "column_count",
                "columns_with_issues",
                "avg_null_ratio",
                "has_critical_issues",
                "total_recommendations",
                "assessment_duration_seconds",
                "last_modified"
            )
            
            csv_path = f"/tmp/table_quality_assessment_{timestamp}.csv"
            
            # Write CSV (single partition for small datasets)
            csv_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(csv_path)
            
            print(f"   ✅ CSV export saved to: {csv_path}")
            print(f"   📥 Use Databricks file download to retrieve CSV")
        
        elif output_format == "delta":
            print("💾 Persisting results as Delta table for historical tracking...")
            
            # Add assessment metadata
            enriched_results = results_df.withColumn("assessment_date", date_format(current_timestamp(), "yyyy-MM-dd")) \
                                        .withColumn("assessment_run_id", lit(timestamp)) \
                                        .withColumn("notebook_version", lit("1.0")) \
                                        .withColumn("scan_partitions_config", lit(scan_partitions)) \
                                        .withColumn("size_threshold_config", lit(size_threshold_gb))
            
            # Create Delta table path
            delta_path = f"/tmp/delta/table_quality_assessments"
            
            try:
                # Append to historical table (create if not exists)
                enriched_results.write.mode("append").option("mergeSchema", "true").format("delta").save(delta_path)
                
                print(f"   ✅ Results appended to Delta table: {delta_path}")
                print(f"   🕐 Historical data available for trend analysis")
                
                # Create/update table in catalog if permissions allow
                try:
                    spark.sql(f"""
                        CREATE TABLE IF NOT EXISTS table_quality_history
                        USING DELTA
                        LOCATION '{delta_path}'
                    """)
                    print(f"   📋 Table 'table_quality_history' created/updated in catalog")
                except Exception as e:
                    print(f"   ⚠️  Could not create catalog table (permissions): {str(e)}")
                
            except Exception as e:
                print(f"   ❌ Failed to save to Delta table: {str(e)}")
            
        
        print("\n🎯 Assessment Summary:")
        print(f"   • Assessment completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"   • Results exported in {output_format} format")
        print(f"   • Run ID: {timestamp}")


    else:
        print("❌ No results available for export. Please run the assessment first.")

In [0]:
def main():
    df = databricks_table_quality_assessment()
    detailed_assessment_results(df)
    detailed_column_analysis(df)
    export_and_persistence(df)


In [0]:
# Execute Main Function

# Execute the main assessment workflow
if __name__ == "__main__":
    main()