### **Kafka Streaming**

### 1. Imports & Connections

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, split, col, current_timestamp, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [20]:
import os

packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0',
    'org.apache.kafka:kafka-clients:3.5.1'
]
os.environ["PYSPARK_SUBMIT_ARGS"] = ''
args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
if not args:
    args = f'--packages {",".join(packages)}'
    print('Using packages', packages)
    os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'
else:
    print(f'Found existing args: {args}') 

Using packages ['org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0', 'org.apache.kafka:kafka-clients:3.5.1']


In [21]:
spark = SparkSession.builder.appName("KafkaStreamingAnalysis") \
  .config("spark.executor.cores", "2") \
  .config("spark.executor.memory", "1500m") \
  .config("spark.driver.memory", "1500m") \
  .config("spark.sql.shuffle.partitions", "10") \
  .config("spark.sql.autoBroadcastJoinThreshold", "10mb") \
  .config("spark.sql.adaptive.enabled", "false") \
  .config("spark.sql.files.maxPartitionBytes", "1mb") \
  .config("spark.eventLog.enabled", "false") \
  .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
  .config("spark.streaming.stopGracefullyOnShutdown", "true") \
  .getOrCreate()

### 2. Define Schema

In [22]:
customer_schema = StructType([
  StructField("customer_id", IntegerType()),
  StructField("name", StringType()),
  StructField("email", StringType()),
  StructField("registration_date", StringType())
])

order_schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("order_date", StringType()),
    StructField("total_amount", FloatType()),
    StructField("status", StringType())
])

### 3. Read Kafka Stream

In [None]:
kafka_brokers = 'broker-1:29091,broker-2:29092,broker-3:29093'
customers_topic_name = "customers"
orders_topic_name = "orders"

# -- Read customer Data --
customer_stream_df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_brokers) \
  .option("subscribe", customers_topic_name) \
  .option("startingOffsets", "earliest") \
  .load()

customer_parsed_df = customer_stream_df.selectExpr("CAST(value AS String)") \
  .select(from_json(col("value"), customer_schema).alias("customer_data")) \
  .select("customer_data.*") \
  .withColumn("ingestion_timestamp", current_timestamp())

# -- Read Order Data
order_stream_df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_brokers) \
  .option("subscribe", orders_topic_name) \
  .option("startingOffsets", "earliest") \
  .load()

order_parsed_df = order_stream_df.selectExpr("CAST(value AS String)") \
  .select(from_json(col("value"), order_schema).alias("order_data")) \
  .select("order_data.*") \
  .withColumn("ingestion_timestamp", current_timestamp())


### 4. Data Validation & Checks

In [24]:
# -- Convert the datetime/timestamp into correct formats --
customer_corrected_df = customer_parsed_df.withColumn(
  "registration_timestamp",
  to_timestamp(
    col("registration_date"),
    "yyyy-MM-dd HH:mm:ss"
  )
).drop("registration_date")

order_corrected_df = order_parsed_df.withColumn(
  "order_timestamp",
  to_timestamp(
    col("order_date"),
    "yyyy-MM-dd HH:mm:ss"
  )
).drop("order_date")

### 5. Write Stream To Sink

In [None]:
base_customers_local_dir = "data/customers"
base_orders_local_dir = "data/orders"

base_customers_checkpoint_dir = "data/checkpoints/customers"
base_orders_checkpoint_dir = "data/checkpoints/orders"

customer_query = customer_corrected_df.writeStream \
  .format("parquet") \
  .outputMode("append") \
  .option("path", f"{base_customers_local_dir}") \
  .option("checkpointLocation", f"{base_customers_checkpoint_dir}") \
  .trigger(processingTime='10 seconds') \
  .start()

order_query = order_corrected_df.writeStream \
  .format("parquet") \
  .outputMode("append") \
  .option("path", f"{base_orders_local_dir}") \
  .option("checkpointLocation", f"{base_orders_checkpoint_dir}") \
  .trigger(processingTime='10 seconds') \
  .start()

customer_query.awaitTermination()
order_query.awaitTermination()

### 6. Stop Application

In [26]:
customer_query.stop()
order_query.stop()

In [27]:
spark.stop()