# Batch Data Processing

In [0]:
## import and fedine schema 

from pyspark.sql.types import *

airlines_schema = StructType([
    StructField("IATA_CODE", StringType(), True),
    StructField("AIRLINE", StringType(), True),
    StructField("LOAD_TIMESTAMP", TimestampType(), True)
])

airports_schema = StructType([
    StructField("IATA_CODE", StringType(), True),
    StructField("AIRPORT", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("COUNTRY", StringType(), True),
    StructField("LATITUDE", StringType(), True),
    StructField("LONGITUDE", StringType(), True),
    StructField("LOAD_TIMESTAMP", TimestampType(), True)
])


In [0]:
schema_map = {
    "airlines.csv": airlines_schema,
    "airports.csv": airports_schema
}


In [0]:
## Define paths and process batch data

raw_path = "dbfs:/FileStore/vijay_project/raw"
bronze_path = "dbfs:/FileStore/vijay_project/bronze/batch_data"

# List all files in RAW folder
raw_files = dbutils.fs.ls(raw_path)

for file in raw_files:
    file_name = file.name
    
    # process only if schema exists for this file
    if file_name in schema_map:
        print(f"Processing batch file: {file_name}")
        
        # Read with schema
        df = (
            spark.read
            .option("header", "true")
            .schema(schema_map[file_name])
            .csv(f"{raw_path}/{file_name}")
        )

        # Add metadata
        from pyspark.sql.functions import current_timestamp, input_file_name

        df_bronze = df.withColumn("INGESTED_AT", current_timestamp()) \
                      .withColumn("SOURCE_FILE", input_file_name())

        # Save to Bronze
        table_name = file_name.replace(".csv", "")

        df_bronze.write.format("delta").mode("overwrite") \
            .save(f"{bronze_path}/{table_name}")

        print(f"Saved Bronze table: {table_name}")

    else:
        print(f"Skipping non-batch file: {file_name}")


Processing batch file: airlines.csv
Saved Bronze table: airlines
Processing batch file: airports.csv
Saved Bronze table: airports
Skipping non-batch file: flights/


# streaming data

In [0]:
## imports


from pyspark.sql.functions import current_timestamp, input_file_name
from pyspark.sql.types import (

    StructType, StructField,

    IntegerType, StringType, TimestampType

)

 

In [0]:
## define schema 

flights_schema = StructType([
    StructField("YEAR", IntegerType(), True),
    StructField("MONTH", IntegerType(), True),
    StructField("DAY", IntegerType(), True),
    StructField("DAY_OF_WEEK", IntegerType(), True),
    StructField("AIRLINE", StringType(), True),
    StructField("FLIGHT_NUMBER", StringType(), True),
    StructField("TAIL_NUMBER", StringType(), True),
    StructField("ORIGIN_AIRPORT", StringType(), True),
    StructField("DESTINATION_AIRPORT", StringType(), True),
    StructField("SCHEDULED_DEPARTURE", StringType(), True),
    StructField("DEPARTURE_TIME", StringType(), True),
    StructField("DEPARTURE_DELAY", StringType(), True),
    StructField("TAXI_OUT", StringType(), True),
    StructField("WHEELS_OFF", StringType(), True),
    StructField("SCHEDULED_TIME", StringType(), True),
    StructField("ELAPSED_TIME", StringType(), True),
    StructField("AIR_TIME", StringType(), True),
    StructField("DISTANCE", IntegerType(), True),
    StructField("WHEELS_ON", StringType(), True),
    StructField("TAXI_IN", StringType(), True),
    StructField("SCHEDULED_ARRIVAL", StringType(), True),
    StructField("ARRIVAL_TIME", StringType(), True),
    StructField("ARRIVAL_DELAY", StringType(), True),
    StructField("DIVERTED", StringType(), True),
    StructField("CANCELLED", StringType(), True),
    StructField("CANCELLATION_REASON", StringType(), True),
    StructField("AIR_SYSTEM_DELAY", StringType(), True),
    StructField("SECURITY_DELAY", StringType(), True),
    StructField("AIRLINE_DELAY", StringType(), True),
    StructField("LATE_AIRCRAFT_DELAY", StringType(), True),
    StructField("WEATHER_DELAY", StringType(), True),
    StructField("LOAD_TIMESTAMP", TimestampType(), True)
])

In [0]:
## define paths

raw_path = "dbfs:/FileStore/vijay_project/raw/flights"
bronze_path = "dbfs:/FileStore/vijay_project/bronze/streaming_data/flights"
checkpoint_path = "dbfs:/FileStore/vijay_project/check_point/flights"

In [0]:
## Function to check if a Delta table exists at the given path, and then validate if the bronze layer is created

def delta_exists(path):
    try:
        dbutils.fs.ls(path + "/_delta_log")
        return True
    except:
        return False

bronze_exists = delta_exists(bronze_path)
print("Bronze Exists:", bronze_exists)


Bronze Exists: True


In [0]:
## Validating the schema 

def validate_schema(incoming_schema, existing_schema):
    incoming_cols = {f.name: f.dataType for f in incoming_schema.fields}
    existing_cols = {f.name: f.dataType for f in existing_schema.fields}

    # Columns added later (should not be validated)
    ignore_cols = {"INGESTED_AT", "SOURCE_FILE"}

    for col, dtype in existing_cols.items():
        if col in ignore_cols:
            continue
        if col not in incoming_cols:
            raise Exception(f"Missing required column: {col}")
        if incoming_cols[col] != dtype:
            raise Exception(f"Data type mismatch for column: {col}")

    print("Schema validation passed")


In [0]:
# If the Bronze table already exists, load it and validate its schema

if bronze_exists:
    existing_df = spark.read.format("delta").load(bronze_path)
    validate_schema(flights_schema, existing_df.schema)


Schema validation passed


In [0]:
## Define the streaming DataFrame by reading CSV files from the raw path

stream_df = (
    spark.readStream
    .format("csv")
    .option("header", "true")
    .schema(flights_schema)   # Mandatory
    .load(raw_path)
)

In [0]:
## Add metadata columns to the streaming DataFrame
 
bronze_stream_df = (
    stream_df
    .withColumn("INGESTED_AT", current_timestamp())
    .withColumn("SOURCE_FILE", input_file_name())
)

In [0]:
## Write the streaming DataFrame to the Bronze table

(
    bronze_stream_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true")   # handles new columns safely
    .trigger(once=True)
    .start(bronze_path)
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7f6679b7b920>

In [0]:
spark.streams.active


[]

In [0]:
spark.read.format("delta") \
    .load(bronze_path) \
    .count()

5819079