In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC # F1 Circuits Data Ingestion Pipeline
# MAGIC 
# MAGIC This notebook ingests circuits.csv data using Azure Databricks Autoloader with proper schema enforcement and metadata columns.

# COMMAND ----------

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import current_timestamp, input_file_name, lit
import os

# COMMAND ----------

# Configuration parameters
storage_account_name = "your_storage_account_name"  # Replace with your storage account name
container_name = "your_container_name"              # Replace with your container name
source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/raw_data/"
checkpoint_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/checkpoints/circuits/"
target_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/processed/circuits/"

# COMMAND ----------

# Define the schema for circuits.csv based on the table structure
circuits_schema = StructType([
    StructField("circuitId", IntegerType(), False),      # Primary key, NOT NULL
    StructField("circuitRef", StringType(), False),      # NOT NULL, unique identifier
    StructField("name", StringType(), False),            # NOT NULL, circuit name
    StructField("location", StringType(), True),         # NULL allowed, location name
    StructField("country", StringType(), True),          # NULL allowed, country name
    StructField("lat", FloatType(), True),               # NULL allowed, latitude
    StructField("lng", FloatType(), True),               # NULL allowed, longitude
    StructField("alt", IntegerType(), True),             # NULL allowed, altitude in metres
    StructField("url", StringType(), False)              # NOT NULL, unique Wikipedia URL
])

# COMMAND ----------

# Autoloader configuration for batch processing
def ingest_circuits_data():
    """
    Ingest circuits.csv data using Autoloader with batch processing
    Includes metadata columns: ingestion_time and source_file_name
    """
    
    # Read data using Autoloader
    df = (spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "csv")
          .option("cloudFiles.schemaLocation", checkpoint_path + "schema")
          .option("header", "true")
          .option("inferSchema", "false")  # Use explicit schema
          .schema(circuits_schema)
          .load(source_path + "circuits.csv")
    )
    
    # Add metadata columns
    df_with_metadata = (df
                       .withColumn("ingestion_time", current_timestamp())
                       .withColumn("source_file_name", input_file_name())
                       .withColumn("data_source", lit("Ergast API"))
                       .withColumn("ingestion_method", lit("autoloader_batch"))
    )
    
    return df_with_metadata

# COMMAND ----------

# Execute the ingestion for batch processing (trigger once)
def run_batch_ingestion():
    """
    Run the circuits data ingestion as a batch job (trigger once)
    """
    
    circuits_df = ingest_circuits_data()
    
    # Write to Delta table with batch trigger
    query = (circuits_df.writeStream
             .format("delta")
             .outputMode("append")
             .option("checkpointLocation", checkpoint_path)
             .option("mergeSchema", "true")
             .trigger(once=True)  # Batch processing - trigger once
             .start(target_path)
    )
    
    return query

# COMMAND ----------

# Execute the batch ingestion
print("Starting circuits data ingestion...")
ingestion_query = run_batch_ingestion()

# Wait for the batch to complete
ingestion_query.awaitTermination()
print("Circuits data ingestion completed successfully!")

# COMMAND ----------

# Verify the ingested data
def verify_ingestion():
    """
    Verify the ingested circuits data and display sample records
    """
    
    # Read the Delta table
    circuits_delta_df = spark.read.format("delta").load(target_path)
    
    print(f"Total records ingested: {circuits_delta_df.count()}")
    print("\nSchema of the ingested data:")
    circuits_delta_df.printSchema()
    
    print("\nSample records:")
    circuits_delta_df.select(
        "circuitId", "circuitRef", "name", "location", "country", 
        "ingestion_time", "source_file_name"
    ).show(5, truncate=False)
    
    return circuits_delta_df

# Verify the ingestion
verify_df = verify_ingestion()

# COMMAND ----------

# Optional: Create a Delta table for easier querying
def create_delta_table():
    """
    Create a managed Delta table for circuits data
    """
    
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS f1_processed.circuits
    USING DELTA
    LOCATION '{target_path}'
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
    """)
    
    print("Delta table 'f1_processed.circuits' created successfully!")

# Create the Delta table
create_delta_table()

# COMMAND ----------

# Display final summary
spark.sql("SELECT COUNT(*) as total_circuits FROM f1_processed.circuits").show()
spark.sql("""
SELECT country, COUNT(*) as circuit_count 
FROM f1_processed.circuits 
GROUP BY country 
ORDER BY circuit_count DESC
LIMIT 10
""").show()