In [0]:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.recommendation.ALSModel

// Define schema
val schema = StructType(Array(
  StructField("user_id", IntegerType),
  StructField("user_name", StringType),
  StructField("track_id", IntegerType),
  StructField("like", IntegerType),
  StructField("timestamp", DoubleType)
))

val aws_access_key = dbutils.secrets.get(scope="aws", key="aws-access-key-id")
val aws_secret_key = dbutils.secrets.get(scope="aws", key="aws-secret-access-key")

// Set AWS credentials
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", aws_access_key)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", aws_secret_key)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

// Load ALS model
val alsModel = ALSModel.load("s3a://souvik-dev-stage/ALSModel")

// Read raw streaming text data from S3
val rawImpressions = spark.readStream
  .format("text")
  .option("wholeText", "true")
  .option("checkpointLocation", "s3a://souvik-dev-stage/checkpoint_new_events/")
  .load("s3a://souvik-dev-stage/music-user-impressions/2025/05/08/20/")

// Split concatenated JSON objects using regex and explode
val splitJsons = rawImpressions.select(
  explode(
    split(col("value"), """(?<=\})(?=\{)""")
  ).alias("json_str")
)

// Parse the JSON and add event_date column
val parsedImpressions = splitJsons
  .select(from_json(col("json_str"), schema).alias("data"))
  .select("data.*")
  .withColumn("event_date", to_date(from_unixtime(col("timestamp"))))


In [0]:
%scala
import org.apache.spark.ml.recommendation.ALSModel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.streaming.Trigger

// Load pre-trained ALS model
val alsModel = ALSModel.load("s3a://souvik-dev-stage/ALSModel")

// Get distinct users from the parsed impressions
val users = parsedImpressions.select("user_id", "event_date").distinct()

// Get top 5 recommendations for all users
val recommendations = alsModel.recommendForAllUsers(5)

// Flatten the recommendation array and extract fields
val flattened = recommendations
  .withColumn("rec", explode(col("recommendations")))
  .select(
    col("user_id"),
    col("rec.track_id").alias("track_id"),
    col("rec.rating").alias("rating")
  )

// Define window spec for ranking
val windowSpec = Window.partitionBy("user_id").orderBy(col("rating").desc)

// Add ranking
val ranked = flattened
  .withColumn("rank", row_number().over(windowSpec))

// Enrich with event_date (join with original user list)
val enriched = users.join(ranked, Seq("user_id"), "leftsemi")
  .select("user_id", "track_id", "rating", "rank", "event_date")

// Write to S3 with partition by event_date
enriched.writeStream
  .format("parquet")
  .outputMode("append")
  .partitionBy("event_date")
  .option("path", "s3a://souvik-dev-stage/user_recommendations/collaborative/")
  .option("checkpointLocation", "s3a://souvik-dev-stage/checkpoint_recommendation/collaborative/")
  .trigger(Trigger.Once())
  .start()
  .awaitTermination()


In [0]:
%scala
val new_data = parsedImpressions.select("user_id", "track_id", "like")

new_data.writeStream 
    .format("csv") 
    .outputMode("append") 
    .option("path", "s3a://souvik-dev-stage/new_user_data/") 
    .option("checkpointLocation", "s3a://souvik-dev-stage/checkpoint_new_data/") 
    .option("header", "true")
    .trigger(Trigger.Once()) 
    .start() 
    .awaitTermination()

In [0]:
# out = spark.read.parquet('s3a://souvik-dev-stage/user_recommendations/')
# out.display()

## Content based recommendation