In [1]:
# Notebook: 09_Streaming_Prediction_Service.ipynb

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType

# --- 1. Configure and Start Spark Session ---
spark = SparkSession.builder \
    .appName("F1 Streaming Prediction Service") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

print("Spark session created!")

# --- 2. Load the Saved ML Model ---
model_path = "s3a://processed-data/models/f1_laptime_model"
print(f"Loading our trained model from {model_path}...")

# Load the model you saved in the previous step
model = PipelineModel.load(model_path)
print("Model loaded successfully.")

# --- 3. Define the Stream Input Schema ---
# Spark Streaming works best when you define the schema of the data it expects
# This MUST match the Parquet files we've been creating
# We only need the columns our model uses + a driver name for context
schema = StructType([
    StructField("Driver", StringType(), True),
    StructField("LapTime", DoubleType(), True),
    StructField("LapNumber", DoubleType(), True),
    StructField("TyreLife", DoubleType(), True),
    StructField("Compound", StringType(), True),
    StructField("IsAccurate", BooleanType(), True)
])

# --- 4. Set Up the "Real-Time" Stream ---
# We will "watch" this S3 directory for any new Parquet files
input_dir = "s3a://raw-data/streaming-input/"
print(f"Setting up stream to watch directory: {input_dir}")

# Read from the directory as a stream
# maxFilesPerTrigger=1 means "process one new file at a time"
raw_stream_df = spark.readStream \
    .format("parquet") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(input_dir)

print("Stream is set up. Awaiting data...")

# --- 5. Define the Processing Logic ---
# This is what happens when a new file is detected
def process_batch(batch_df, batch_id):
    print(f"\n--- New Data Detected! (Batch ID: {batch_id}) ---")
    
    # 1. Clean the new data
    clean_df = batch_df.filter(
        (F.col('IsAccurate') == True) &
        (F.col('Compound') != 'UNKNOWN') &
        (F.col('LapTime').isNotNull())
    ).na.drop()
    
    if clean_df.count() == 0:
        print("No valid data in this batch. Awaiting next file.")
        return

    print(f"Processing {clean_df.count()} new laps...")
    
    # 2. Apply our loaded ML model to make predictions
    predictions = model.transform(clean_df)
    
    # 3. Show the results
    print("Model Predictions:")
    predictions.select(
        "Driver", 
        "Compound", 
        "TyreLife", 
        "LapTime",       # The actual, real lap time
        F.round("prediction", 3).alias("PredictedLapTime") # Our model's guess
    ).show()

# --- 6. Start the Stream ---
# This stream will run forever in your notebook, printing to the console
# It uses process_batch function to handle data as it arrives
query = raw_stream_df.writeStream \
    .foreachBatch(process_batch) \
    .start()

print("--- Streaming Service is LIVE ---")
print("Now, go to Notebook 10 and run it to trigger the stream!")

Spark session created!
Loading our trained model from s3a://processed-data/models/f1_laptime_model...
Model loaded successfully.
Setting up stream to watch directory: s3a://raw-data/streaming-input/
Stream is set up. Awaiting data...
--- Streaming Service is LIVE ---
Now, go to Notebook 10 and run it to trigger the stream!

--- New Data Detected! (Batch ID: 0) ---
Processing 3 new laps...
Model Predictions:
+------+--------+--------+-------+----------------+
|Driver|Compound|TyreLife|LapTime|PredictedLapTime|
+------+--------+--------+-------+----------------+
|   VER|  MEDIUM|     2.0| 85.744|          92.129|
|   VER|  MEDIUM|     3.0| 83.275|          91.977|
|   VER|  MEDIUM|     4.0|   83.4|          91.825|
+------+--------+--------+-------+----------------+

