In [2]:
pip install numpy pymongo

[0mNote: you may need to restart the kernel to use updated packages.


In [34]:
#imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType
import sys
import time
import signal
from pyspark.ml.recommendation import ALSModel

In [35]:
# Initialize Spark Session with proper configurations
spark = SparkSession.builder \
    .appName("DockerSparkToLocalMongo") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,"
            "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.output.uri", "mongodb://host.docker.internal:27017/movie_lens.recommendations") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
    .getOrCreate()

In [36]:
# Load the pre-trained ALS model
model_path = "hdfs://namenode:9000/movie-lens/models/als_model"
try:
    model = ALSModel.load(model_path)
    print("Successfully loaded ALS model")
except Exception as e:
    print(f"Failed to load ALS model: {str(e)}", file=sys.stderr)
    spark.stop()
    sys.exit(1)

# Define schema for incoming Kafka messages
rating_schema = StructType([
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", FloatType()),
    StructField("timestamp", TimestampType())
])


Successfully loaded ALS model


In [37]:
def generate_recommendations(user_ids_df):
    # Get distinct user IDs (assumes small batch, fast op)
    distinct_users = user_ids_df.select("userId").distinct()

    # Generate top 5 recommendations
    recommendations = model.recommendForUserSubset(distinct_users, 5)

    # Explode array of recommendations into rows
    exploded_recs = recommendations.select(
        "userId",
        F.explode("recommendations").alias("recommendation")
    ).select(
        "userId",
        F.col("recommendation.movieId").alias("movieId"),
        F.col("recommendation.rating").alias("predictedRating")
    )

    return exploded_recs

In [38]:
def process_batch(batch_df, batch_id):
    if not batch_df.isEmpty():
        try:
            # Generate recommendations for users in this batch
            recommendations_df = generate_recommendations(batch_df)

            # Add metadata columns
            result_df = recommendations_df.withColumn("processing_time", F.current_timestamp()) \
                                          .withColumn("batch_id", F.lit(batch_id))

            # Write to MongoDB
            (result_df.write
                .format("mongo")
                .mode("append")
                .option("database", "movie_lens")
                .option("collection", "recommendations")
                .save())

            print(f"Batch {batch_id} processed: {result_df.count()} recommendations saved.")
        
        except Exception as e:
            print(f"Error processing batch {batch_id}: {str(e)}", file=sys.stderr)


In [39]:
# Create Kafka source stream
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "namenode:9092") \
    .option("subscribe", "movie_rating") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .option("maxOffsetsPerTrigger", "1000") \
    .load()

In [40]:
# Parse the JSON data from Kafka
processed_df = df.selectExpr("CAST(value AS STRING)") \
                .select(F.from_json(F.col("value"), rating_schema).alias("data")) \
                .select("data.*")

# Start the streaming query
query = processed_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/checkpoint_movies") \
    .trigger(processingTime='10 seconds') \
    .start()

25/05/02 13:26:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/02 13:26:00 WARN StreamingQueryManager: Stopping existing streaming query [id=50a8cf76-ef4a-4ccc-ad2a-4b81e576d59c, runId=633ab1f8-bd4e-42d8-9bfd-455b02d85e95], as a new run is being started.
25/05/02 13:26:00 WARN OffsetSeqMetadata: Updating the value of conf 'spark.sql.shuffle.partitions' in current session from '200' to '4'.
25/05/02 13:26:00 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [41]:
# Graceful shutdown handler
def handle_shutdown(signum, frame):
    print("\nShutting down gracefully...")
    query.stop()
    spark.stop()
    sys.exit(0)

signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)

<function __main__.handle_shutdown(signum, frame)>

In [42]:
#Monitoring loop
try:
    while query.isActive:
        progress = query.lastProgress
        if progress:
            print(f"Batch ID: {progress['batchId']}, "
                  f"Input rows: {progress['numInputRows']}, "
                  f"Processed: {progress['processedRowsPerSecond']:.1f} rows/sec")
        time.sleep(5)
except Exception as e:
    print(f"Streaming query failed: {str(e)}", file=sys.stderr)
    handle_shutdown(None, None)

Batch ID: 118, Input rows: 0, Processed: 0.0 rows/sec
Batch ID: 118, Input rows: 0, Processed: 0.0 rows/sec
Batch ID: 118, Input rows: 3, Processed: 1.3 rows/sec
Batch ID: 118, Input rows: 3, Processed: 1.3 rows/sec
Batch ID: 119, Input rows: 11, Processed: 5.5 rows/sec
Batch ID: 119, Input rows: 11, Processed: 5.5 rows/sec
Batch ID: 120, Input rows: 11, Processed: 4.0 rows/sec
Batch ID: 120, Input rows: 11, Processed: 4.0 rows/sec
Batch ID: 121, Input rows: 11, Processed: 3.0 rows/sec
Batch ID: 121, Input rows: 11, Processed: 3.0 rows/sec
Batch ID: 122, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 122, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 123, Input rows: 11, Processed: 4.7 rows/sec
Batch ID: 123, Input rows: 11, Processed: 4.7 rows/sec


                                                                                

Batch ID: 124, Input rows: 11, Processed: 4.7 rows/sec
Batch ID: 124, Input rows: 11, Processed: 4.7 rows/sec
Batch ID: 125, Input rows: 9, Processed: 5.0 rows/sec
Batch ID: 125, Input rows: 9, Processed: 5.0 rows/sec
Batch ID: 126, Input rows: 11, Processed: 5.0 rows/sec
Batch ID: 126, Input rows: 11, Processed: 5.0 rows/sec
Batch ID: 127, Input rows: 11, Processed: 4.0 rows/sec
Batch ID: 127, Input rows: 11, Processed: 4.0 rows/sec


                                                                                

Batch ID: 128, Input rows: 11, Processed: 4.8 rows/sec
Batch ID: 128, Input rows: 11, Processed: 4.8 rows/sec


                                                                                

Batch ID: 129, Input rows: 11, Processed: 3.3 rows/sec
Batch ID: 129, Input rows: 11, Processed: 3.3 rows/sec
Batch ID: 130, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 130, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 131, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 131, Input rows: 11, Processed: 3.1 rows/sec


                                                                                

Batch ID: 132, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 132, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 133, Input rows: 11, Processed: 4.9 rows/sec
Batch ID: 133, Input rows: 11, Processed: 4.9 rows/sec


                                                                                

Batch ID: 134, Input rows: 11, Processed: 2.8 rows/sec
Batch ID: 134, Input rows: 11, Processed: 2.8 rows/sec


                                                                                

Batch ID: 134, Input rows: 11, Processed: 2.8 rows/sec
Batch ID: 135, Input rows: 11, Processed: 1.9 rows/sec


                                                                                

Batch ID: 136, Input rows: 11, Processed: 2.7 rows/sec
Batch ID: 136, Input rows: 11, Processed: 2.7 rows/sec


                                                                                

Batch ID: 137, Input rows: 11, Processed: 3.8 rows/sec
Batch ID: 137, Input rows: 11, Processed: 3.8 rows/sec


                                                                                

Batch ID: 138, Input rows: 11, Processed: 3.5 rows/sec
Batch ID: 138, Input rows: 11, Processed: 3.5 rows/sec


                                                                                

Batch ID: 139, Input rows: 9, Processed: 3.4 rows/sec
Batch ID: 139, Input rows: 9, Processed: 3.4 rows/sec


                                                                                

Batch ID: 140, Input rows: 11, Processed: 3.9 rows/sec
Batch ID: 140, Input rows: 11, Processed: 3.9 rows/sec


                                                                                

Batch ID: 141, Input rows: 11, Processed: 2.8 rows/sec
Batch ID: 141, Input rows: 11, Processed: 2.8 rows/sec
Batch ID: 142, Input rows: 11, Processed: 5.5 rows/sec
Batch ID: 142, Input rows: 11, Processed: 5.5 rows/sec


                                                                                

Batch ID: 142, Input rows: 11, Processed: 5.5 rows/sec
Batch ID: 143, Input rows: 11, Processed: 2.0 rows/sec
Batch ID: 144, Input rows: 11, Processed: 3.0 rows/sec
Batch ID: 144, Input rows: 11, Processed: 3.0 rows/sec


                                                                                

Batch ID: 145, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 145, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 146, Input rows: 11, Processed: 3.5 rows/sec
Batch ID: 146, Input rows: 11, Processed: 3.5 rows/sec
Batch ID: 147, Input rows: 11, Processed: 4.0 rows/sec
Batch ID: 147, Input rows: 11, Processed: 4.0 rows/sec
Batch ID: 148, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 148, Input rows: 11, Processed: 3.1 rows/sec


                                                                                

Batch ID: 149, Input rows: 11, Processed: 2.9 rows/sec
Batch ID: 149, Input rows: 11, Processed: 2.9 rows/sec


                                                                                

Batch ID: 150, Input rows: 9, Processed: 3.2 rows/sec
Batch ID: 150, Input rows: 9, Processed: 3.2 rows/sec


                                                                                

Batch ID: 151, Input rows: 11, Processed: 3.4 rows/sec
Batch ID: 151, Input rows: 11, Processed: 3.4 rows/sec


                                                                                

Batch ID: 152, Input rows: 11, Processed: 3.7 rows/sec
Batch ID: 152, Input rows: 11, Processed: 3.7 rows/sec


                                                                                

Batch ID: 153, Input rows: 11, Processed: 3.6 rows/sec
Batch ID: 153, Input rows: 11, Processed: 3.6 rows/sec


                                                                                

Batch ID: 154, Input rows: 11, Processed: 5.5 rows/sec
Batch ID: 154, Input rows: 11, Processed: 5.5 rows/sec


                                                                                

Batch ID: 155, Input rows: 11, Processed: 2.4 rows/sec
Batch ID: 155, Input rows: 11, Processed: 2.4 rows/sec


                                                                                

Batch ID: 156, Input rows: 11, Processed: 4.5 rows/sec
Batch ID: 156, Input rows: 11, Processed: 4.5 rows/sec
Batch ID: 157, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 157, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 158, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 158, Input rows: 11, Processed: 4.4 rows/sec
Batch ID: 159, Input rows: 9, Processed: 3.2 rows/sec
Batch ID: 159, Input rows: 9, Processed: 3.2 rows/sec
Batch ID: 160, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 160, Input rows: 11, Processed: 3.1 rows/sec


                                                                                

Batch ID: 161, Input rows: 11, Processed: 3.9 rows/sec
Batch ID: 161, Input rows: 11, Processed: 3.9 rows/sec


                                                                                

Batch ID: 162, Input rows: 11, Processed: 3.4 rows/sec
Batch ID: 162, Input rows: 11, Processed: 3.4 rows/sec


                                                                                

Batch ID: 163, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 163, Input rows: 11, Processed: 3.1 rows/sec
Batch ID: 164, Input rows: 11, Processed: 4.2 rows/sec
Batch ID: 164, Input rows: 11, Processed: 4.2 rows/sec


                                                                                

Batch ID: 165, Input rows: 11, Processed: 2.5 rows/sec
Batch ID: 165, Input rows: 11, Processed: 2.5 rows/sec


                                                                                

Batch ID: 166, Input rows: 11, Processed: 2.8 rows/sec
Batch ID: 166, Input rows: 11, Processed: 2.8 rows/sec


                                                                                

Batch ID: 167, Input rows: 11, Processed: 3.3 rows/sec
Batch ID: 167, Input rows: 11, Processed: 3.3 rows/sec


                                                                                

Batch ID: 168, Input rows: 11, Processed: 2.6 rows/sec
Batch ID: 168, Input rows: 11, Processed: 2.6 rows/sec


                                                                                

Batch ID: 169, Input rows: 9, Processed: 2.2 rows/sec
Batch ID: 169, Input rows: 9, Processed: 2.2 rows/sec


                                                                                

KeyboardInterrupt: 