In [1]:
// Import statements
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

In [2]:
// Create SparkSession
val spark = SparkSession.builder()
      .appName("CaseStudy1 - Genre-Specific Data Aggregation Pipeline")
      .getOrCreate()

spark = org.apache.spark.sql.SparkSession@75f04fe1


org.apache.spark.sql.SparkSession@75f04fe1

In [3]:
// Read movies data from GCP bucket

val moviesDataPath = "gs://spark-tasks-bucket/day_16_17/movie.csv"
val moviesDF = spark.read.option("header", "true").option("inferSchema", "true").csv(moviesDataPath)

moviesDataPath = gs://spark-tasks-bucket/day_16_17/movie.csv
moviesDF = [movieId: int, title: string ... 1 more field]


[movieId: int, title: string ... 1 more field]

In [4]:
// Read ratings data from GCP bucket

val ratingsDataPath = "gs://spark-tasks-bucket/day_16_17/rating.csv"
val ratingsDF = spark.read.option("header", "true").option("inferSchema", "true").csv(ratingsDataPath)

[userId: int, movieId: int ... 2 more fields]

ratingsDataPath = gs://spark-tasks-bucket/day_16_17/rating.csv
ratingsDF = [userId: int, movieId: int ... 2 more fields]


In [5]:
val movieGenreDF = moviesDF.withColumn("genre", explode(split(col("genres"), "\\|")))
                            .select("movieId", "title", "genre")

movieGenreDF = [movieId: int, title: string ... 1 more field]


[movieId: int, title: string ... 1 more field]

In [6]:
val genres = movieGenreDF.select("genre").distinct().show()

+------------------+
|             genre|
+------------------+
|           Mystery|
|           Musical|
|            Action|
|           Romance|
|          Thriller|
|           Fantasy|
|         Animation|
|         Film-Noir|
|              IMAX|
|            Sci-Fi|
|             Drama|
|       Documentary|
|(no genres listed)|
|           Western|
|            Comedy|
|             Crime|
|               War|
|          Children|
|         Adventure|
|            Horror|
+------------------+



genres = ()


()

In [7]:
// Here the genres [(no genres listed), IMAX] are not the actual genres and Sci-Fi converted to Science Fiction

val updatedMovieGenreDF = movieGenreDF.withColumn("genre", trim($"genre"))
                                .filter(!$"genre".isin("IMAX", "(no genres listed)"))
                                .withColumn(
                                    "genre", 
                                    when($"genre" === "Sci-Fi", "Science Fiction").otherwise($"genre")
                                 ) 

updatedMovieGenreDF = [movieId: int, title: string ... 1 more field]


[movieId: int, title: string ... 1 more field]

In [8]:
val updatedGenres = updatedMovieGenreDF.select("genre").distinct().show()

+---------------+
|          genre|
+---------------+
|        Mystery|
|        Musical|
|         Action|
|        Romance|
|       Thriller|
|        Fantasy|
|      Animation|
|      Film-Noir|
|          Drama|
|    Documentary|
|Science Fiction|
|        Western|
|         Comedy|
|          Crime|
|            War|
|       Children|
|      Adventure|
|         Horror|
+---------------+



updatedGenres = ()


()

In [9]:
val joinedRDD = ratingsDF.join(
                            updatedMovieGenreDF, 
                            ratingsDF("movieId") === updatedMovieGenreDF("movieId"), 
                            "inner")
                         .select("genre", "rating")

joinedRDD = [genre: string, rating: double]


[genre: string, rating: double]

In [10]:
val genreRatingsRDD = joinedRDD.rdd.map { case Row(genre: String, rating: Double) => (genre, (rating, 1)) }

genreRatingsRDD = MapPartitionsRDD[44] at map at <console>:28


MapPartitionsRDD[44] at map at <console>:28

In [11]:
val reducedRDD = genreRatingsRDD.reduceByKey { case ((r1, c1), (r2, c2)) => (r1 + r2, c1 + c2) }

reducedRDD = ShuffledRDD[45] at reduceByKey at <console>:28


ShuffledRDD[45] at reduceByKey at <console>:28

In [12]:
val averageRDD = reducedRDD.mapValues{ case (sum, count) => sum / count }

averageRDD = MapPartitionsRDD[46] at mapValues at <console>:28


MapPartitionsRDD[46] at mapValues at <console>:28

In [13]:
val averageDF = averageRDD.toDF("genre", "average_rating")

averageDF = [genre: string, average_rating: double]


[genre: string, average_rating: double]

In [14]:
averageDF.show(10)

+---------+------------------+
|    genre|    average_rating|
+---------+------------------+
|  Fantasy|3.5059453358738244|
|   Action|  3.44386376493354|
| Children|3.4081137685270444|
|  Mystery| 3.663508921312903|
|  Romance| 3.541802581902903|
|Film-Noir|  3.96538126070082|
|  Western|3.5704980246109406|
|  Musical| 3.558090628821412|
|   Horror|3.2772238097518307|
| Thriller|  3.50711121809216|
+---------+------------------+
only showing top 10 rows



In [15]:
val output_path = "hdfs:///user/day_16_17/case_study_1"
averageDF.write.mode("overwrite").parquet(output_path)

hdfs:///user/day_16_17/case_study_1

output_path = hdfs:///user/day_16_17/case_study_1


In [16]:
// Try reading the data
spark.read.parquet(output_path).show()

+---------------+------------------+
|          genre|    average_rating|
+---------------+------------------+
|        Western|3.5704980246109406|
|        Musical| 3.558090628821412|
|         Horror|3.2772238097518307|
|       Thriller|  3.50711121809216|
|      Adventure|3.5018926565473865|
|       Children|3.4081137685270444|
|        Mystery| 3.663508921312903|
|        Romance| 3.541802581902903|
|      Film-Noir|  3.96538126070082|
|            War|3.8095307347384844|
|Science Fiction|3.4367726714455005|
|          Crime|3.6745276025631113|
|      Animation|3.6174939235897994|
|    Documentary|3.7397176834178865|
|          Drama|3.6742955093068264|
|        Fantasy|3.5059453358738244|
|         Action|  3.44386376493354|
|         Comedy|3.4260113054324886|
+---------------+------------------+



In [17]:
spark.stop()