In [123]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, to_timestamp, concat_ws, when, row_number, unix_timestamp, date_format, lag, broadcast
from pyspark.sql.window import Window
import os

In [125]:
# Initialize Spark session
def create_spark_session():
    spark = SparkSession.builder\
        .appName("ETL_Job")\
        .config("spark.sql.adaptive.enabled", "true")\
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")\
        .config("spark.sql.adaptive.localShuffleReader.enabled", "true")\
        .config("spark.sql.autoBroadcastJoinThreshold", "20MB")\
        .config("spark.sql.dynamicPartitionPruning.enabled", "true")\
        .getOrCreate()
    return spark

# Enable Adaptive Query Execution  # Optimize partitions  # Optimize shuffle  # Enable broadcast joins for small tables
# Enable dynamic partition pruning

In [119]:
raw_logs_folder = "C:/Users/vrnp2/MBC"  # Folder containing JSON files for raw logs
program_logs_file = "C:/Users/vrnp2/Mbc_program_file/program_data.txt"  # Path to program_logs file (single file)


In [121]:
# List files in the raw logs folder

import os
raw_logs_files = [os.path.join(raw_logs_folder, f) for f in os.listdir(raw_logs_folder) if f.endswith(".json")]

print("Files in the folder:")
for file in raw_logs_files:
    print(file)


Files in the folder:
C:/Users/vrnp2/MBC\events_20240101.json
C:/Users/vrnp2/MBC\events_20240102.json
C:/Users/vrnp2/MBC\events_20240103.json
C:/Users/vrnp2/MBC\events_20240104.json
C:/Users/vrnp2/MBC\events_20240105.json
C:/Users/vrnp2/MBC\events_20240106.json
C:/Users/vrnp2/MBC\events_20240107.json


In [13]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, split,explode

# Define schema for raw logs
raw_logs_schema = StructType([
    StructField("mac_id", StringType(), True),
    StructField("event_date", StringType(), True),
    StructField("event_time", StringType(), True),
    StructField("channel_name", StringType(), True),
    StructField("program_id", StringType(), True),
    StructField("geo_location", StringType(), True),
    StructField("event_code", StringType(), True),
    StructField("satellite_name", StringType(), True)
])

# Define schema for program logs
program_logs_schema = StructType([
    StructField("program_id", StringType(), True),
    StructField("program_name", StringType(), True),
    StructField("program_genre", StringType(), True)
])

# Read the text file
program_logs_df_raw = spark.read.text(program_logs_file)

# Split the rows into columns based on the tab delimiter
program_logs_df_split = program_logs_df_raw.select(
    split(col("value"), "\t")[0].alias("program_id"),
    split(col("value"), "\t")[1].alias("program_name"),
    split(col("value"), "\t")[2].alias("program_genre")
)

# Filter out the header row
program_logs_df = program_logs_df_split.filter(col("program_id") != "program_id")


program_logs_df.show(2)
print("Program logs loaded successfully.")

+----------+------------+-------------+
|program_id|program_name|program_genre|
+----------+------------+-------------+
|       158| Program 158|       Comedy|
|       168| Program 168|       Comedy|
+----------+------------+-------------+
only showing top 2 rows

Program logs loaded successfully.


In [103]:
def process_and_transform_json(raw_logs_df, program_logs_df):
    # exploded_df = raw_logs_df.select(explode(col("events_20240101")).alias("event"))
    exploded_df = raw_logs_df.select(explode(col(dynamic_key)).alias("event"))

    # Extract fields from the exploded data
    transformed_df = exploded_df.select(
        col("event.mac").alias("mac_id"),
        col("event.eventdate").alias("event_date"),
        col("event.eventtime").alias("event_time"),
        col("event.chname").alias("channel_name"),
        col("event.program_id").alias("program_id"),
        col("event.geo_location").alias("geo_location"),
        col("event.code").alias("event_code"),
        col("event.sat").alias("satellite_name"),
        col("event.ts").alias("timestamp"),
        col("event.indextime").alias("index_time")
    )

    # handling null records 
    transformed_df = transformed_df.fillna({"geo_location": "Unknown", "index_time": "N/A"}) 

    # Combine event_date and event_time into a single datetime column
    transformed_df = transformed_df.withColumn(
        "datetime",
        to_timestamp(concat_ws(" ", col("event_date"), col("event_time")), "yyyy-MM-dd HH:mm:ss")
    ).drop("event_date", "event_time")

    # Extract date from the datetime column
    raw_logs_df = raw_logs_df.withColumn("date", date_format(col("datetime"), "yyyy-MM-dd"))

    enriched_df = enriched_df.repartition("date")   
    # repartitioniing based on the date column to increase parellellism


    # Join with program metadata for enrichment
    enriched_df = transformed_df.join(
        broadcast(program_logs_df),
        transformed_df.program_id == program_logs_df.program_id,
        "left"
    ).drop(program_logs_df.program_id)

    # Categorize users based on geo_location and program_genre
    enriched_df = enriched_df.withColumn(
        "user_category",
        when(col("geo_location").isNotNull(), "Known Location").otherwise("Unknown Location")
    ).withColumn(
        "program_category",
        when(col("program_genre").like("%Comedy%"), "Comedy Viewer")
        .when(col("program_genre").like("%News%"), "News Viewer")
        .otherwise("General Viewer")
    )

    # Deduplicate data 
    window_spec = Window.partitionBy("mac_id", "datetime").orderBy(col("datetime").desc())
    deduped_df = enriched_df.withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") == 1).drop("row_num")

        # Define window specification to calculate duration for each user
    window_spec = Window.partitionBy("mac_id").orderBy("datetime")
    
    # Calculate duration as the difference between consecutive events
    deduped_df = deduped_df.withColumn(
        "duration_in_seconds",
        unix_timestamp(lag("datetime").over(window_spec)) - unix_timestamp("datetime")
    ).fillna(0, subset=["duration_in_seconds"])  # Fill null durations with 0

    

    return deduped_df



In [71]:
for file_path in raw_logs_files:
    try:
        print(f"Processing file: {file_path}")

        # Read the JSON file
        raw_logs_df = spark.read.option("multiline", "true").json(file_path)

        # Get the first top-level key dynamically
        dynamic_key = raw_logs_df.columns[0]
        print(f"Dynamic key found in the JSON file: {dynamic_key}")

        # Call the transformation function which iclude all the transformation(cleaning, validating ,enriching)
        transformed_data = process_and_transform_json(raw_logs_df, program_logs_df)

        # print("Transformed and Deduplicated DataFrame:")
        # transformed_data.show(1)

        # Raw Data Table: Selecting all the columns for auditing and troubleshooting purposes
        raw_data_df = transformed_data.select(
            col("mac_id"),
            col("datetime"),
            col("channel_name"),
            col("program_id"),
            col("geo_location"),
            col("event_code"),
            col("satellite_name"),
            col("timestamp"),
            col("index_time"),
            col("duration_in_seconds")
        )

        # Downstream Table
        downstream_df = transformed_data.withColumn(
            "minute_level", 
            to_timestamp(concat_ws(" ", col("datetime").cast("date"), col("datetime").cast("timestamp").substr(12, 5)))
        )
        
        # Deduplicate by mac_id and minute_level, keeping only the latest record within the same minute
        window_spec = Window.partitionBy("mac_id", "minute_level").orderBy(col("datetime").desc())
        downstream_df = downstream_df.withColumn("row_num", row_number().over(window_spec)) \
            .filter(col("row_num") == 1) \
            .select(
                col("mac_id"),
                col("minute_level").alias("datetime"),
                col("channel_name"),
                col("program_id")
            )
        
        print("Raw Data Table:")
        raw_data_df.show(1)
        
        print("Downstream Table:")
        downstream_df.show(1)


        # Try-Catch Block for Redshift Loading
        try:
            # Load Raw Data Table
            raw_data_df.write \
                .format("jdbc") \
                .option("url", redshift_url) \
                .option("dbtable", "raw_data_table") \
                .option("user", redshift_user_name) \
                .option("password", redshift_password) \
                .option("driver", "com.amazon.redshift.jdbc42.Driver") \
                .mode("append") \
                .save()
            
            # Load Downstream Table
            downstream_df.write \
                .format("jdbc") \
                .option("url", redshift_url) \
                .option("dbtable", "downstream_table") \
                .option("user", redshift_user) \
                .option("password", redshift_password) \
                .option("driver", "com.amazon.redshift.jdbc42.Driver") \
                .mode("append") \
                .save()
            print(f"Successfully processed and saved file: {file_path}")

        except Exception as redshift_error:
            print(f"Error loading data into Redshift for file {file_path}: {redshift_error}")

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")

Processing file: C:/Users/vrnp2/MBC\events_20240101.json
Dynamic key found in the JSON file: events_20240101
Raw Data Table:
+--------------------+-------------------+--------------+----------+------------------+----------+--------------+------------------+--------------------+-------------------+
|              mac_id|           datetime|  channel_name|program_id|      geo_location|event_code|satellite_name|         timestamp|          index_time|duration_in_seconds|
+--------------------+-------------------+--------------+----------+------------------+----------+--------------+------------------+--------------------+-------------------+
|4136b85db09a646f2...|2024-01-01 00:09:22|SSC Extra 1 HD|         0|[21.0362, 52.2394]|    SALIVE|      BADR_4&6|1709076809.0243657|2024-01-01T00:37:...|                  0|
+--------------------+-------------------+--------------+----------+------------------+----------+--------------+------------------+--------------------+-------------------+
only 