In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType
)

# Create Spark configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("used_cars_stream")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# CRITICAL: Add Kafka packages
sparkConf.set("spark.jars.packages", 
              "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")

# Create the spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

spark 

In [2]:
#  Kafka settings 
KAFKA_BOOTSTRAP_SERVERS = "kafka1:9093"   # inside Docker network
TOPIC_NAME = "used_cars_stream"        

# JSON schema coming from the producer 
stream_schema = (
    StructType()
    .add("id", StringType())
    .add("url", StringType())
    .add("region", StringType())
    .add("price", StringType())  # Will convert to double later
    .add("year", StringType())   # Will convert to int later
    .add("manufacturer", StringType())
    .add("model", StringType())
    .add("condition", StringType())
    .add("fuel", StringType())
    .add("odometer", StringType())  # Will convert to double later
    .add("state", StringType())
    .add("posting_date", StringType())
)



In [3]:
# 1) Read raw Kafka stream
raw_df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
         .option("subscribe", TOPIC_NAME)
         .option("startingOffsets", "latest")  
         .load()
)

# 2) Extract JSON string from Kafka's value column
json_df = raw_df.selectExpr("CAST(value AS STRING) AS json_string")

# 3) Parse JSON into columns
parsed_df = (
    json_df
    .select(F.from_json("json_string", stream_schema).alias("data"))
    .select("data.*")
)

parsed_df.printSchema()


root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- state: string (nullable = true)
 |-- posting_date: string (nullable = true)



In [4]:
# In Cell 5 - improve cleaning
current_year = 2025

df_clean = (
    parsed_df
    # Cast to proper types
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("year", F.col("year").cast("int"))
    .withColumn("odometer", F.col("odometer").cast("double"))
    
    # Valid price
    .filter(F.col("price").isNotNull() & (F.col("price") > 0) & (F.col("price") < 200_000))
    # Reasonable year
    .filter(F.col("year").isNotNull() & (F.col("year") >= 1980) & (F.col("year") <= current_year + 1))
    # Odometer non-negative if present
    .filter(F.col("odometer").isNull() | (F.col("odometer") >= 0))
)

# State cleaning
df_clean = df_clean.withColumn(
    "state_clean",
    F.when(
        (F.col("state").isNotNull()) & (F.length("state") <= 2),
        F.upper(F.col("state"))
    )
)

# Event time
df_clean = df_clean.withColumn(
    "posting_ts",
    F.to_timestamp("posting_date")
)

df_clean.printSchema()

root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- price: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: double (nullable = true)
 |-- state: string (nullable = true)
 |-- posting_date: string (nullable = true)
 |-- state_clean: string (nullable = true)
 |-- posting_ts: timestamp (nullable = true)



In [5]:
#Streaming aggregation per state and manufacturer
agg_stream = (
    df_clean
    .groupBy("state_clean", "manufacturer")
    .agg(
        F.count("*").alias("listing_count"),
        F.avg("price").alias("avg_price"),
        F.expr("percentile_approx(odometer, 0.5)").alias("median_odometer")
    )
)


In [6]:
# DEBUG: Test if Kafka is readable
from pyspark.sql import functions as F

# Read just raw Kafka messages (no parsing)
test_query = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "kafka1:9093")
         .option("subscribe", "used_cars_stream")
         .option("startingOffsets", "earliest")
         .load()
         .selectExpr("CAST(value AS STRING) as message")
         .writeStream
         .format("console")
         .option("truncate", "false")
         .option("numRows", 5)
         .start()
)

test_query.awaitTermination(30)
test_query.stop()

In [7]:
# Write stream to console
query_console = (
    agg_stream
    .writeStream
    .outputMode("complete")       # re-emit full table each batch
    .format("console")
    .option("truncate", "false")
    .option("numRows", 20)
    .start()
)
# Wait up to 30 seconds to see some batches
query_console.awaitTermination(60)

False

In [8]:
# Write cleaned stream to Parquet on disk
output_path = "/home/jovyan/data/used_cars_stream_agg"
checkpoint_path = "/home/jovyan/checkpoint/used_cars_stream_agg"


query_parquet = (
    df_clean
    .writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", output_path)
    .option("checkpointLocation", checkpoint_path)
    .start()
)


In [None]:
stream_result = spark.read.parquet("/home/jovyan/data/used_cars_stream_agg")
stream_result.show(20, truncate=False)


In [None]:
# In a new cell
try:
    test_query.stop()
except:
    pass

try:
    query_console.stop()
except:
    pass

spark.stop()
