In [6]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD

// Initialize the Spark session
val spark = SparkSession.builder()
  .appName("Genre-Specific Data Aggregation Pipeline")
  .getOrCreate()


val ratingsPath = "gs://spark_learning_1/notebooks/ratings.csv"  // Ratings data
val moviesPath = "gs://spark_learning_1/notebooks/movies.csv"    // Movies data (title and genres)


// Cast the 'rating' column to Double to avoid ClassCastException
val ratingsDF = spark.read.option("header", "true").csv(ratingsPath)
  .withColumn("rating", col("rating").cast("double"))  // Cast 'rating' to Double

// Load the movie details data (movieId, title, genres)
val moviesDF = spark.read.option("header", "true").csv(moviesPath)


val explodedGenresDF = moviesDF
  .withColumn("genre", explode(split(col("genres"), "\\|")))  // Split and explode the genres

// Show the exploded DataFrame
explodedGenresDF.show(5)

// Step 2: Custom Transformation to handle inconsistent genre names
val genreMapping = Map(
  "Sci-Fi" -> "Science Fiction",
  "Action|Adventure" -> "Action",
  "Romance|Drama" -> "Drama"  
)

// Function to map genres to their standardized names
val standardizeGenre = udf((genre: String) => {
  genreMapping.getOrElse(genre, genre)  // Return the mapped value if exists, else return the genre as is
})

// Apply the transformation to standardize genres
val standardizedGenresDF = explodedGenresDF
  .withColumn("standardized_genre", standardizeGenre(col("genre")))

// Show the standardized genres DataFrame
standardizedGenresDF.show(5)

// Step 3: Join the ratings data with the standardized genres DataFrame
val joinedDF = ratingsDF
  .join(standardizedGenresDF, "movieId")  // Join by movieId
  .select("userId", "rating", "standardized_genre")  // Select relevant columns

joinedDF.show(5)

// Step 4: Convert to RDD and calculate the average rating by genre using reduceByKey
// Map each row to (genre, (rating, count)) for aggregation
val ratingsRDD: RDD[(String, (Double, Int))] = joinedDF.rdd.map(row => {
  val genre = row.getAs[String]("standardized_genre")
  val rating = row.getAs[Double]("rating")  // Ensure 'rating' is cast to Double
  (genre, (rating, 1))  // (genre, (rating, count))
})

// Perform aggregation by genre using reduceByKey
val aggregatedRDD = ratingsRDD.reduceByKey((a, b) => {
  val totalRating = a._1 + b._1
  val count = a._2 + b._2
  (totalRating, count)
})

// Calculate the average rating for each genre
val avgRatingsRDD = aggregatedRDD.mapValues { case (totalRating, count) =>
  totalRating / count  // Calculate average
}

avgRatingsRDD.collect().foreach(println)

// Step 5: Convert the RDD back to DataFrame
val avgRatingsDF = avgRatingsRDD.toDF("genre", "avg_rating")

avgRatingsDF.show()

// Step 6: Save the aggregated results in Parquet format
val outputParquetPath = "gs://spark_learning_1/notebooks/aggregated_movie_ratings_by_genre/"
avgRatingsDF.write.parquet(outputParquetPath)

println(s"Aggregated results saved to Parquet at: $outputParquetPath")

// Stop the Spark session when done
spark.stop()


+-------+----------------+--------------------+---------+
|movieId|           title|              genres|    genre|
+-------+----------------+--------------------+---------+
|      1|Toy Story (1995)|Adventure|Animati...|Adventure|
|      1|Toy Story (1995)|Adventure|Animati...|Animation|
|      1|Toy Story (1995)|Adventure|Animati...| Children|
|      1|Toy Story (1995)|Adventure|Animati...|   Comedy|
|      1|Toy Story (1995)|Adventure|Animati...|  Fantasy|
+-------+----------------+--------------------+---------+
only showing top 5 rows

+-------+----------------+--------------------+---------+------------------+
|movieId|           title|              genres|    genre|standardized_genre|
+-------+----------------+--------------------+---------+------------------+
|      1|Toy Story (1995)|Adventure|Animati...|Adventure|         Adventure|
|      1|Toy Story (1995)|Adventure|Animati...|Animation|         Animation|
|      1|Toy Story (1995)|Adventure|Animati...| Children|          C

spark = org.apache.spark.sql.SparkSession@49600373
ratingsPath = gs://spark_learning_1/notebooks/ratings.csv
moviesPath = gs://spark_learning_1/notebooks/movies.csv
ratingsDF = [userId: string, movieId: string ... 2 more fields]
moviesDF = [movieId: string, title: string ... 1 more field]
explodedGenresDF = [movieId: string, title: string ... 2 more fields]
genreMapping = Map(Sci-Fi -> Science Fiction, Action|Adventure -> Action, Romance|Drama -> Drama)


standardizeGenre: org.apache.spa...


Map(Sci-Fi -> Science Fiction, Action|Adventure -> Action, Romance|Drama -> Drama)