## importing libraries

In [0]:
from pyspark.sql.functions import regexp_extract, current_timestamp, col, lit
from datetime import datetime,timezone
import uuid

## Configurations

In [0]:
# ‚îÄ‚îÄ Job Parameters ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# Default values are used during interactive runs.
# Databricks Job overrides these at runtime via the Parameters section.
# Key names here must match exactly what the Job defines.

dbutils.widgets.text(
    "catalog_name", "T20_catalog_dev",
    "Catalog Name"
)

In [0]:
# ADLS Storage Configuration
storage_account = "adlschitturidemo"
container = "cricinfo-mens-international"
source_base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/cricket_commentary/"

In [0]:
# Unity Catalog Configuration
CATALOG_NAME = dbutils.widgets.get("catalog_name")
SCHEMA_NAME = "bronze"
FULL_SCHEMA = f"{CATALOG_NAME}.{SCHEMA_NAME}"

'''
# Create catalog if it doesn't exists
spark.sql(f"""
    CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}
    MANAGED LOCATION 'abfss://unity-catalog-storage@dbstorage3x4najbqbebyi.dfs.core.windows.net/7405610025593193'
""")
'''

# Create schema if it doesn't exists
spark.sql(f"""CREATE SCHEMA IF NOT EXISTS {FULL_SCHEMA}""")

#Create Unity Catalog Volume for pipeline files(Checkpoints & Schema)
spark.sql(f"""CREATE VOLUME IF NOT EXISTS {FULL_SCHEMA}.pipeline_files""")

# Unity Catalog Volume (works on serverless)
checkpoint_base_path = f"""/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/pipeline_files/checkpoints/t20/"""

##Create Tables under Bronze schema with CDC enabled

In [0]:
spark.sql(f"""CREATE TABLE IF NOT EXISTS {FULL_SCHEMA}.match_metadata (
    Batchid STRING,
    ground STRING,
    toss STRING,
    series STRING,
    season STRING,
    player_of_the_match STRING,
    player_of_the_series STRING,
    hours_of_play_local_time STRING,
    match_days STRING,
    t20_debut STRING,
    t20i_debut STRING,
    umpires STRING,
    tv_umpire STRING,
    reserve_umpire STRING,
    match_referee STRING,
    points STRING,
    match_number STRING,
    matchid BIGINT,
    player_replacements STRING,
    first_innings STRING,
    second_innings STRING,
    has_super_over BOOLEAN,
    super_over_count INT,
    series_result STRING,

    -- Auto Loader metadata
    load_timestamp TIMESTAMP,
    source_file STRING
)
USING DELTA
PARTITIONED BY (matchid)
TBLPROPERTIES (
  delta.enableChangeDataFeed = true
)""")


spark.sql(f"""CREATE TABLE IF NOT EXISTS {FULL_SCHEMA}.match_events (
    Batchid STRING,
    match_ball_number BIGINT,
    ball STRING,
    event STRING,
    score STRING,
    commentary STRING,
    bowler STRING,
    batsman STRING,
    innings STRING,
    matchid BIGINT,
    is_super_over BOOLEAN,

    -- Auto Loader metadata
    load_timestamp TIMESTAMP,
    source_file STRING
)
USING DELTA
PARTITIONED BY (matchid)
TBLPROPERTIES (
  delta.enableChangeDataFeed = true
)""")


spark.sql(f"""CREATE TABLE IF NOT EXISTS {FULL_SCHEMA}.match_players (
    Batchid STRING,
    matchid BIGINT,
    innings STRING,
    team STRING,
    player_name STRING,
    batted BOOLEAN,
    batting_position INT,
    player_type STRING,
    retired STRING,
    not_out STRING,
    bowled STRING,
    
    -- Auto Loader metadata
    load_timestamp TIMESTAMP,
    source_file STRING
)
USING DELTA
PARTITIONED BY (matchid)
TBLPROPERTIES (
  delta.enableChangeDataFeed = true
)""")

## IPL Data Ingestion pipeline for both the Incremental load

In [0]:
class IPLDataPipelineUnified:
    """Simplified pipeline using Auto Loader for both initial and incremental loads"""
    
    def __init__(self, source_base_path, catalog_name, schema_name, checkpoint_base_path):
        self.source_base_path = source_base_path
        self.catalog_name = catalog_name
        self.schema_name = schema_name
        self.full_schema = f"{catalog_name}.{schema_name}"
        self.checkpoint_base_path = checkpoint_base_path
        self.batch_id = self._generate_batch_id()
        self.tables = {
            "match_events": {
                "pattern": "match_events_data.csv",
                "format": "csv",
                "table_name": f"{self.full_schema}.match_events",
                "description": "Ball-by-ball match events",
                "schema_hints": "matchid BIGINT"
            },
            "match_metadata": {
                "pattern": "metadata_data.json",
                "format": "json",
                "table_name": f"{self.full_schema}.match_metadata",
                "description": "Match metadata and results",
                "schema_hints": "matchid STRING"
            },
            "match_players": {
                "pattern": "match_players_data.csv",
                "format": "csv",
                "table_name": f"{self.full_schema}.match_players",
                "description": "Player information per match",
                "schema_hints": "matchid BIGINT"
            }
        }
    
    @staticmethod
    def _generate_batch_id():
        """Generate batch_id from Databricks job context or fallback to UUID."""
        try:
            context = (dbutils.notebook.entry_point.getDbutils()
                             .notebook().getContext())
            job_id = context.jobId().get()
            run_id = context.idInJob().get()
            return f"job_{job_id}_run_{run_id}"
        except Exception:
            ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
            return f"interactive_{ts}_{uuid.uuid4().hex[:8]}"
    
    def _add_metadata(self, df):
        """Extract matchid from _metadata.file_path"""
        df = df.withColumn("load_timestamp", current_timestamp()) \
                        .withColumn("source_file",col("_metadata.file_path")) \
                        .withColumn("Batchid", lit(self.batch_id))
        return df
    
    def load_table(self, table_name):
        """
        Unified load method using Auto Loader for both initial and incremental loads
        
        Args:
            table_name: Name of the table to load
            is_initial_load: If True, will overwrite table; if False, will append
        """
        config = self.tables[table_name]
        source_path = self.source_base_path
        table_full_name = config['table_name']
        checkpoint_path = f"{self.checkpoint_base_path}{table_name}"
        schema_path = f"{checkpoint_path}/schema"
        
    
        
        print(f"\n{'='*80}")
        print(f"Target: {table_full_name}")
        print(f"Format: {config['format']}")
        print(f"Source: {source_path}")
        print(f"Checkpoint: {checkpoint_path}")
        print(f"Schema Hints: {config['schema_hints']}")
        print(f"file format: {config['format']}")
        print(f"{'='*80}")
        
        # Configure Auto Loader (works for both initial and incremental)
        reader = spark.readStream.format("cloudFiles") \
                    .option("cloudFiles.format", config['format']) \
                    .option("cloudFiles.schemaLocation", schema_path) \
                    .option("cloudFiles.inferColumnTypes", "true") \
                    .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
                    .option("cloudFiles.validateOptions", "true") \
                    .option("cloudFiles.schemaHints", config['schema_hints']) \
                    .option("pathGlobFilter", config['pattern']) \
                    .option("recursiveFileLookup", "true") \
                    .option("cloudFiles.useNotifications",   "false")
        
        # CSV-specific options
        if config['format'] == 'csv':
            reader = reader.option("header", "true")
        
        # Load stream
        df = reader.load(source_path) \
            .withColumn("matchid", col("matchid").cast("bigint"))
        
        # Add metadata columns
        df = self._add_metadata(df)
        
        print(f"Auto Loader configured successfully")
        
        # Write stream with trigger(availableNow=True) for batch processing
        query = df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", checkpoint_path) \
            .option("mergeSchema", "true") \
            .partitionBy("matchid") \
            .trigger(availableNow=True) \
            .toTable(table_full_name)

        
        # Wait for completion
        query.awaitTermination()
        
        # Get statistics
        count = spark.table(table_full_name).count()
        partitions = spark.table(table_full_name).select("matchid").distinct().count()
        
        print(f"‚úì load completed")
        print(f"‚úì Total rows: {count:,}")
        print(f"‚úì Partitions: {partitions}")

        # Optimize after initial load
        if not spark.catalog.tableExists(table_full_name):
            self.optimize_table(table_name)
        
        return query
    
    def ingestion_auto_loader(self):
        """Load all tables using Auto Loader (incremental load)"""
        print("\n" + "="*80)
        print("INCREMENTAL LOAD - ALL TABLES (AUTO LOADER)")
        print("="*80)
        
        results = {}
        for table_name in self.tables.keys():
            try:
                self.load_table(table_name)
                results[table_name] = "SUCCESS"
            except Exception as e:
                print(f"‚úó Error: {e}")
                import traceback
                traceback.print_exc()
                results[table_name] = f"FAILED: {e}"
        
        # Summary
        print("\n" + "="*80)
        print("INCREMENTAL LOAD SUMMARY")
        print("="*80)
        for table, status in results.items():
            icon = "‚úì" if status == "SUCCESS" else "‚úó"
            print(f"{icon} {self.tables[table]['table_name']}: {status}")
        
        return results
    
    def optimize_table(self, table_name):
        """Optimize table with Z-Ordering"""
        config = self.tables[table_name]
        table_full_name = config['table_name']
        
        print(f"\nOptimizing {table_full_name}...")
        
        # Z-Order on non-partition columns
        if config.get('zorder_columns'):
            cols = ", ".join(config['zorder_columns'])
            print(f"  Z-Ordering by: {cols}")
            spark.sql(f"OPTIMIZE {table_full_name} ZORDER BY ({cols})")
        else:
            spark.sql(f"OPTIMIZE {table_full_name}")
        
        print(f"‚úì Optimization complete")

    def show_statistics(self):
        """Show statistics for all tables"""
        print("\n" + "="*80)
        print("TABLE STATISTICS")
        print("="*80)
        
        for table_name, config in self.tables.items():
            table_full_name = config['table_name']
            try:
                df = spark.table(table_full_name)
                count = df.count()
                partitions = df.select("matchid").distinct().count()
                
                print(f"\n{table_full_name}:")
                print(f"  Format: {config['format']}")
                print(f"  Rows: {count:,}")
                print(f"  Partitions: {partitions}")
                
                # Show sample
                display(df.limit(3))
                
            except Exception as e:
                print(f"  ‚úó Error: {e}")
    
    def reset_checkpoint(self, table_name):
        """
        Reset checkpoint to reprocess all files (use with caution!)
        Useful for debugging or when you need to reload everything
        """
        checkpoint_path = f"{self.checkpoint_base_path}{table_name}"
        
        print(f"‚ö†Ô∏è  WARNING: Resetting checkpoint for {table_name}")
        print(f"   This will reprocess ALL files on next load")
        
        try:
            dbutils.fs.rm(checkpoint_path, recurse=True)
            print(f"‚úì Checkpoint reset: {checkpoint_path}")
        except Exception as e:
            print(f"‚úó Error resetting checkpoint: {e}")

## Pipeline execution 

In [0]:
# Initialize pipeline
pipeline = IPLDataPipelineUnified(
    source_base_path=source_base_path,
    catalog_name=CATALOG_NAME,
    schema_name=SCHEMA_NAME,
    checkpoint_base_path=checkpoint_base_path
)

In [0]:
# OPTION 1: Incremental Load using Auto Loader
print("üöÄ Running Incremental LOAD with Auto Loader...")
results = pipeline.ingestion_auto_loader()

In [0]:
# Show statistics
pipeline.show_statistics()

In [0]:
# Show all tables
spark.sql(f"SHOW TABLES IN {FULL_SCHEMA}").show(truncate=False)

In [0]:
# Query examples
display(spark.sql(f"""
    SELECT matchid, COUNT(*) as row_count
    FROM {FULL_SCHEMA}.match_events
    GROUP BY matchid
    ORDER BY matchid
"""))

## Reset Checkpoint (Use Carefully!)

In [0]:
# Uncomment to reset a checkpoint (will reprocess all files)
#pipeline.reset_checkpoint("match_events")
#pipeline.reset_checkpoint("match_metadata")
#pipeline.reset_checkpoint("match_players")

In [0]:
# Run DQ checks after ingestion completes
dq_result = dbutils.notebook.run(
    "/Workspace/Users/pradeepchitturi@gmail.com/Dev_T20CommentaryParser/DataQualityRulesBronzeLayer",
    timeout_seconds=600,  # 10 min timeout,
    arguments = {
        "catalog_name": CATALOG_NAME,
        "schema_name":  SCHEMA_NAME
    }

)
print(f"DQ Result: {dq_result}")