In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import org.apache.spark.sql.Row



val spark = SparkSession.builder
  .appName("Genre-Specific Data Aggregation Pipeline")
  .master("yarn")
  .getOrCreate()

val sc = spark.sparkContext
val movies = "gs://srinija/archive/movie.csv"
val ratings = "gs://srinija/archive/rating.csv"

val moviesDf = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(movies)

val ratingsDF = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(ratings)

moviesDf.printSchema()
ratingsDF.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



spark = org.apache.spark.sql.SparkSession@74a7262d
sc = org.apache.spark.SparkContext@22ac6f57
movies = gs://srinija/archive/movie.csv
ratings = gs://srinija/archive/rating.csv
moviesDf = [movieId: int, title: string ... 1 more field]
ratingsDF = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

In [2]:
val normalizeGenre = udf((genre: String) => genre match {
  case "Sci-Fi" => "Science Fiction"
  case "Film-Noir" => "Film Noir"
  case other => other
})


spark.udf.register("normalizeGenre", normalizeGenre)

normalizeGenre = SparkUserDefinedFunction($Lambda$5339/0x0000000801fa2840@75e221b6,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),None,true,true)


SparkUserDefinedFunction($Lambda$5339/0x0000000801fa2840@75e221b6,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),Some(normalizeGenre),true,true)

In [3]:
val splitMoviesDF = moviesDf.withColumn("split_genres", split(col("genres"), "\\|"))
val normalizedMoviesDF = splitMoviesDF.withColumn(
  "normalized_genres",
  expr("transform(split_genres, g -> normalizeGenre(g))")
)


splitMoviesDF = [movieId: int, title: string ... 2 more fields]
normalizedMoviesDF = [movieId: int, title: string ... 3 more fields]


[movieId: int, title: string ... 3 more fields]

In [4]:
val maxGenres = 5
val genreColumns = (0 until maxGenres).map(i => col("normalized_genres").getItem(i).alias(s"genre_$i"))
val moviesWithGenresDF = normalizedMoviesDF.select((Seq(col("movieId"), col("title")) ++ genreColumns): _*)

val joinedDF = moviesWithGenresDF.join(ratingsDF, "movieId")

val genresRatingsRDD = joinedDF.rdd.flatMap(row => {
  val rating = row.getAs[Double]("rating")
  (0 until maxGenres).flatMap(i => Option(row.getAs[String](s"genre_$i")).map(genre => (genre, rating)))
})

maxGenres = 5
genreColumns = Vector(normalized_genres[0] AS genre_0, normalized_genres[1] AS genre_1, normalized_genres[2] AS genre_2, normalized_genres[3] AS genre_3, normalized_genres[4] AS genre_4)
moviesWithGenresDF = [movieId: int, title: string ... 5 more fields]
joinedDF = [movieId: int, title: string ... 8 more fields]
genresRatingsRDD = MapPartitionsRDD[32] at flatMap at <console>:37


MapPartitionsRDD[32] at flatMap at <console>:37

In [5]:
val genreRatingsCount = genresRatingsRDD.mapValues(rating => (rating, 1))
val genreRatingsSum = genreRatingsCount.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
val genreAverageRatings = genreRatingsSum.mapValues { case (sum, count) => sum / count }

val genreRatingsSchema = StructType(Seq(
  StructField("genre", StringType, nullable = false),
  StructField("average_rating", DoubleType, nullable = false)
))

val genreRatingsDF = spark.createDataFrame(
  genreAverageRatings.map { case (genre, avgRating) => Row(genre, avgRating) },
  genreRatingsSchema
)


genreRatingsCount = MapPartitionsRDD[33] at mapValues at <console>:31
genreRatingsSum = ShuffledRDD[34] at reduceByKey at <console>:32
genreAverageRatings = MapPartitionsRDD[35] at mapValues at <console>:33
genreRatingsSchema = StructType(StructField(genre,StringType,false),StructField(average_rating,DoubleType,false))
genreRatingsDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [6]:
val tempLocalPath = "/tmp/average_genre_ratings_preview"

genreRatingsDF.write
  .format("parquet")
  .mode("overwrite")
  .save(tempLocalPath)

val previewDF = spark.read.parquet(tempLocalPath)

tempLocalPath = /tmp/average_genre_ratings_preview
previewDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [7]:
println("Preview of output in Parquet format:")
previewDF.show(truncate = false)


Preview of output in Parquet format:
+------------------+------------------+
|genre             |average_rating    |
+------------------+------------------+
|Western           |3.570915695918952 |
|Musical           |3.557119424554034 |
|Horror            |3.277253954753292 |
|Thriller          |3.5070349367995215|
|Adventure         |3.5018926565473865|
|Documentary       |3.7397176834178865|
|(no genres listed)|3.0069252077562325|
|Drama             |3.6742955093068264|
|War               |3.80934610398963  |
|Science Fiction   |3.441057955871034 |
|Crime             |3.6745276025631113|
|Animation         |3.6174939235897994|
|Fantasy           |3.50568599276768  |
|Film Noir         |3.9660770776087007|
|Action            |3.44386376493354  |
|Children          |3.4081137685270444|
|Mystery           |3.6654721937927395|
|Romance           |3.540029584886227 |
|IMAX              |3.617922713208124 |
|Comedy            |3.4260113054324886|
+------------------+------------------+



In [8]:

val outputPath = "hdfs:///user/day_16_17/case_study_1/"

genreRatingsDF.write
  .format("parquet")
  .mode("overwrite")
  .save(outputPath)

println(s"Aggregated genre ratings saved to: $outputPath")

spark.stop()


Aggregated genre ratings saved to: hdfs:///user/day_16_17/case_study_1/


outputPath = hdfs:///user/day_16_17/case_study_1/


hdfs:///user/day_16_17/case_study_1/