# Spark Kafka Data Streaming

In [4]:
# Standard library imports
import os
import json
import datetime

# Third-party library imports
import pandas as pd
import numpy
import pyspark
from pyspark.sql import DataFrame
from pyspark.sql.functions import udf, col, from_json, from_csv, sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DateType

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

print("pyspark ",pyspark.__version__)
print("pandas ", pd.__version__)
print("numpy ", numpy.__version__)


pyspark  3.3.2
pandas  1.5.2
numpy  1.24.2


In [None]:
## Configuration file reader

def read_config(config_file):
    """
    Reads the kafka configuration information that is stored in the system    
    """
    conf = {}    
    with open(config_file) as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                conf[parameter] = value.strip()          
    return conf

In [None]:
# read the local configuration files

config_path = os.path.join(os.path.dirname('/home/ozkary/.kafka/'),'localhost-nosasl.properties')
config = read_config(config_path)
print(config)

In [None]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

## Read from Kafka Stream

In [None]:

topic = 'mta-turnstile'
client_id = 'Spark-Notebook-Session'
group_id = 'turnstile'

use_sasl = "sasl.mechanism" in config and config["sasl.mechanism"] is not None

kafka_options = {
            "kafka.bootstrap.servers": config["bootstrap.servers"],
            "subscribe": topic,
            "startingOffsets": "latest",
            "failOnDataLoss": "false",
            "client.id": client_id,            
            "group.id": group_id,            
            "auto.offset.reset": "latest",
            "checkpointLocation": "checkpoint",
            "minPartitions": "2",
            "enable.auto.commit": "false",
            "enable.partition.eof": "true"                        
        }          

if use_sasl:
    # set the JAAS configuration only when use_sasl is True
    sasl_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="kafka" username="{self.settings["sasl.username"]}" password="{self.settings["sasl.password"]}";'

    login_options = {
        "kafka.sasl.mechanisms": self.settings["sasl.mechanism"],
        "kafka.security.protocol": self.settings["security.protocol"],
        "kafka.sasl.username": self.settings["sasl.username"],
        "kafka.sasl.password": self.settings["sasl.password"],  
        "kafka.sasl.jaas.config": sasl_config          
    }
    # merge the login options with the kafka options
    kafka_options = {**kafka_options, **login_options}  

In [None]:
def value_deserializer(value: bytes) -> any:
    """
    Message value deserializer
    """
    return json.loads(value) 

# set the stream source
# default for startingOffsets is "latest"
stream = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .option("key.deserializer", value_deserializer) \
    .option("value.deserializer", value_deserializer) \
    .load()


stream.printSchema()

In [None]:

def write_to_console(df: DataFrame, output_mode: str = 'append', processing_time: str = '60 seconds') -> None:
    """
        Output stream values to the console
    """
    
    console_query = df.writeStream\
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    
    # console_query.awaitTermination()   

# write a streaming data frame to storage ./storage
def write_to_storage(df: DataFrame, output_mode: str = 'append', processing_time: str = '60 seconds') -> None:
    """
        Output stream values to the console
    """   
    df_csv = df.select(
        "AC", "UNIT", "SCP", "STATION", "LINENAME", "DIVISION", "DATE", "DESC", "TIME","ENTRIES", "EXITS"
    )
        
    path = "./storage/"     
    storage_query = df_csv.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("csv") \
        .option("header", True) \
        .option("path", path) \
        .option("checkpointLocation", "./checkpoint") \
        .option("truncate", False) \
        .option("compression", "gzip") \
        .start()
    
    # storage_query.awaitTermination()

# Define the schema for the incoming data
turnstiles_schema = StructType([
    StructField("AC", StringType()),
    StructField("UNIT", StringType()),
    StructField("SCP", StringType()),
    StructField("STATION", StringType()),
    StructField("LINENAME", StringType()),
    StructField("DIVISION", StringType()),
    StructField("DATE", StringType()),
    StructField("TIME", StringType()),
    StructField("DESC", StringType()),
    StructField("ENTRIES", IntegerType()),
    StructField("EXITS", IntegerType()),
    StructField("ID", StringType()),
    StructField("TIMESTAMP", StringType())
])

In [None]:

def parse_messages(stream, schema) -> DataFrame:
    """
    Parse the messages and use the provided schema to type cast the fields
    """
    assert stream.isStreaming is True, "DataFrame doesn't receive streaming data"

    options =  {'header': 'true', 'sep': ','}
    df = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")               
                                
    # print("df =====>",df)
    # split attributes to nested array in one Column
    col = F.split(df['value'], ',')
    
    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
        
    # remove quotes from TIMESTAMP column
    df = df.withColumn("TIMESTAMP", F.regexp_replace(F.col("TIMESTAMP"), '"', ''))    
    df = df.withColumn("AC", F.regexp_replace(F.col("AC"), '"', ''))    
    
    result = df.select([field.name for field in schema])    

    df.dropDuplicates(["ID","STATION","TIMESTAMP"])

    result.printSchema()
    
    return result
    

In [None]:
def agg_messages(df, window_duration: str, window_slide: str) -> DataFrame:
        """
            Window for n minutes aggregations group by AC, UNIT, STATION, DATE, DESC
        """
        
        # Ensure TIMESTAMP is in the correct format (timestamp type)    
        date_format = "yyyy-MM-dd HH:mm:ss"        
        df = df.withColumn("TS", F.to_timestamp("TIMESTAMP", date_format))    

        df_windowed = df \
            .withWatermark("TS", window_duration) \
            .groupBy(F.window("TS", window_duration, window_slide),"AC", "UNIT","SCP","STATION","LINENAME","DIVISION", "DATE", "DESC") \
            .agg(
                F.sum("ENTRIES").alias("ENTRIES"),
                F.sum("EXITS").alias("EXITS")
            ).withColumn("START",F.col("window.start")) \
            .withColumn("END", F.col("window.end")) \
            .withColumn("TIME", F.date_format("window.end", "HH:mm:ss")) \
            .drop("window") \
            .select("AC","UNIT","SCP","STATION","LINENAME","DIVISION","DATE","DESC","TIME","START","END","ENTRIES","EXITS")
        
        df_windowed.printSchema()            

        return df_windowed


In [None]:
def add_by_station(df, window_duration: str, window_slide: str) -> DataFrame:
    
    # Ensure TIMESTAMP is in the correct format (timestamp type)    
    date_format = "yyyy-MM-dd HH:mm:ss"        
    df = df.withColumn("TS", F.to_timestamp("TIMESTAMP", date_format))    

    df_windowed = df \
        .withWatermark("TS", window_duration) \
        .groupBy(F.window("TS", window_duration, window_slide), "STATION") \
        .agg(
            F.sum("ENTRIES").alias("ENTRIES"),
            F.sum("EXITS").alias("EXITS")
        ).withColumn("START",F.col("window.start")) \
        .withColumn("END", F.col("window.end")) \
        .withColumn("TIME", F.date_format("window.start", "HH:mm:ss")) \
        .drop("window") \
        .select("STATION","TIME","START","END","ENTRIES","EXITS")
    
    df_windowed.printSchema()
    return df_windowed 

In [None]:
def process_batch(df, id, tag='message'):

    # get the values from the first row
    row = df.first()
    # check if the TIMESTAMP value can be casted as timestamp
    # if not, the row is invalid and we can skip the batch

    # if row is None:
    #     # print(f"Invalid {tag} batch {id}")
    #     return
    
    # ts = row['TIMESTAMP']

    # try:
    #     row['TIMESTAMP'].cast("timestamp")
    # except:
    #     print(f"Invalid TIMESTAMP {ts} value in batch {id}")
    
    print(f"Processing {tag} batch {id} with {df.count()} records. {row}")
    # if df.isEmpty():
    #     print(f"DataFrame is empty in this batch {id}.")
    #     # Handle empty DataFrame as needed
    # else:
    #      print("Data found in this batch.")

In [None]:
# convert the schema to string
schema_string = turnstiles_schema.simpleString()
df_messages = parse_messages(stream, schema=turnstiles_schema)
write_to_console(df_messages)
# write_to_storage(df_messages)

# query = df_messages.writeStream \
#                    .foreachBatch(lambda batch, id: process_batch(batch, id, 'by_message')) \
#                    .start()

window_duration = '5 minutes'
window_slide = '5 minutes'

df_windowed = agg_messages(df_messages,window_duration, window_slide)

# Start the streaming query
# query = df_windowed.writeStream.outputMode("append").format("memory").queryName("output").start()


# query = df_windowed.writeStream \
#                    .foreachBatch(lambda batch, id: process_batch(batch, id, 'by_station')) \
#                    .start()

write_to_storage(df_windowed)


In [None]:
# clean-up session 

# Stop any active streaming queries (if applicable)
for query in spark.streams.active:
    query.stop()


# Stop existing SparkSession
spark.stop()


In [None]:
import os

# Create a new SparkSession to cleanup folders
spark = SparkSession.builder.appName("cleanup").getOrCreate()

def remove_dir(path):
    if os.path.exists(path):
        os.rmdir(path) 

# Delete checkpoint directory
checkpoint_dir = "./checkpoint/"
# spark.sql("DROP TABLE IF EXISTS your_checkpoint_table")  # Drop the checkpoint table if using structured streaming

# Delete storage directory
storage_dir = "./storage/"
remove_dir(storage_dir)

