In [1]:
println("Welcome to the spark application")

Welcome to the spark application


# Case Study 1: Genre-Specific Data Aggregation Pipeline

Objective: Aggregate movie ratings by genre and store the results in a Parquet format for analytics.

Scenario: The Movielens dataset is stored in GCP Cloud Storage as CSV files. You need to calculate the average ratings per genre for analytics. Some genre information requires custom transformations due to inconsistent formats.

Steps:

Ingestion: Load the movies.csv and ratings.csv files as DataFrames from GCP Cloud Storage.

movies.csv contains columns: movieId, title, genres. ratings.csv contains columns: userId, movieId, rating, timestamp.

Transformation: Use DataFrames to parse and explode the genres column into individual genre rows (e.g., split Action|Comedy into two rows: Action and Comedy).
Convert to an RDD for custom transformations to handle inconsistent genre names (e.g., mapping Sci-Fi to Science Fiction).


Aggregation: Perform the join between movies and ratings on movieId using a DataFrame.
Use RDD transformations to calculate the average rating for each genre using a combination of reduceByKey and custom key-value mapping.

Storage: Convert the RDD back to a DataFrame and save the aggregated results in Parquet format in HDFS.

In [6]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, split, col}

val basePath   = "gs://scala-spark-temp/movies/"
val movieFile  = "movie.csv"
val ratingFile = "rating.csv"

val movieDf  = spark.read.option("header", "true").option("inferSchema", "true").csv(basePath + movieFile)
val ratingDf = spark.read.option("header", "true").option("inferSchema", "true").csv(basePath + ratingFile)
val moviesWithGenresDf =
movieDf.withColumn("genre", explode(split(col("genres"), "\\|"))).select("movieId", "title", "genre")

val ratingWithMovies =
  ratingDf
    .join(moviesWithGenresDf, ratingDf("movieId") === moviesWithGenresDf("movieId"), "inner")
    .select(
      ratingDf("userId"),
      ratingDf("movieId"),
      ratingDf("rating"),
      ratingDf("timestamp"),
      moviesWithGenresDf("title"),
      moviesWithGenresDf("genre")
    )

val ratingWithMoviesRdd = ratingWithMovies.rdd

val transformedRdd =
      ratingWithMoviesRdd.map { row =>
        val genre =
          row.getAs[String]("genre") match {
            case "Sci-Fi" =>
              "Science Fiction"
            case other =>
              other
          }
        (genre, (row.getAs[Double]("rating"), 1))
      }

val reducedRdd =
      transformedRdd
        .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
        .mapValues { case (sum, count) =>
          sum / count
        }

val genreRatingsDf = spark.createDataFrame(reducedRdd).toDF("genre", "average_rating")

genreRatingsDf.write.mode("overwrite").parquet("hdfs://10.128.0.5:8020/user/suraj/rating")

// ratingWithMovies.write.mode("overwrite").parquet("gs://scala-spark-temp/movies/output")

// ratingWithMovies.write.mode("overwrite").parquet("hdfs://10.128.0.5:8020/user/suraj/output")
println("program ended")

program ended


basePath = gs://scala-spark-temp/movies/
movieFile = movie.csv
ratingFile = rating.csv
movieDf = [movieId: int, title: string ... 1 more field]
ratingDf = [userId: int, movieId: int ... 2 more fields]
moviesWithGenresDf = [movieId: int, title: string ... 1 more field]
ratingWithMovies = [userId: int, movieId: int ... 4 more fields]
ratingWithMoviesRdd = MapPartitionsRDD[109] at rdd at <console>:60
transformedRdd = MapPartitionsRDD[110] a...


MapPartitionsRDD[110] a...

# Case Study 2: User Rating History Partitioning

Objective: Partition the Movielens dataset by user for faster query processing.

Scenario: Movielens user ratings data (CSV format) needs to be partitioned into separate folders for each user in HDFS.

Steps:

Ingestion: Load the ratings.csv file as a DataFrame from GCP Cloud Storage.

Transformation: Use a DataFrame to filter out invalid or incomplete records.
Convert the DataFrame into an RDD to dynamically create key-value pairs of userId and their corresponding ratings.

Partitioning: Use RDD transformations like groupByKey to partition ratings data by userId.
Write each user's data to a separate folder in HDFS using the saveAsTextFile method.

Verification: Validate that the HDFS structure follows the format /user-data/{userId}/ratings.csv.

In [9]:
println("application started .....")

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, split, col}

val basePath   = "gs://scala-spark-temp/movies/"
val ratingFile = "rating.csv"
val hdfsPath  = "hdfs://10.128.0.5:8020/user/suraj/movies/"

val ratingDf = spark.read.option("header", "true").option("inferSchema", "true").csv(basePath + ratingFile)

println("read the data")


val validRatingDf =
    ratingDf.filter(
        col("userId").isNotNull && col("movieId").isNotNull && col("rating").isNotNull && col("timestamp").isNotNull
    )

// val ratingRdd = validRatingDf.rdd.map(row => (row.getAs[Int]("userId"), row.mkString(",")))

// val partitionedRdd = ratingRdd.groupByKey()

validRatingDf.write.partitionBy("userId").mode("overwrite").parquet(hdfsPath + "ratings.parquet")

// partitionedRdd.collect().foreach { case (userId, ratings) =>
//     try {
//         val userRatingsRdd = spark.sparkContext.parallelize(ratings.toSeq)
//         val outputPath = s"hdfs://10.128.0.5:8020/user/suraj/user-data/$userId/ratings"
//         userRatingsRdd.saveAsTextFile(outputPath)
//     } catch {
//         case e: Exception =>
//           println(s"Error processing user $userId: ${e.getMessage}")
//     }
// }

println("program ended .....")

application started .....
read the data
program ended .....


basePath = gs://scala-spark-temp/movies/
ratingFile = rating.csv
ratingDf = [userId: int, movieId: int ... 2 more fields]
validRatingDf = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

![Alt text](task_2.png)

# Case Study 3: Handling Incomplete Metadata
Objective: Enrich incomplete movie metadata using additional JSON files.

Scenario: Movielens metadata (e.g., movies.csv) is missing releaseYear for some movies. Supplementary metadata in JSON format is available for enrichment.

Steps:

Ingestion: Load movies.csv from GCP Cloud Storage as a DataFrame.
Load metadata.json from GCP Cloud Storage into an RDD for custom parsing.

Transformation: Use RDD operations to parse the JSON file and extract movieId and releaseYear.
Perform an RDD join with the movies DataFrame to fill in missing releaseYear.

Validation: Convert the enriched RDD back into a DataFrame.
Validate that all movies have a releaseYear field.

Storage: Save the enriched DataFrame in Parquet format in HDFS.

In [6]:
import org.apache.spark.sql.{Row, SparkSession, functions => F}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

println("application started ...")

val basePath   = "gs://scala-spark-temp/movies/"
val movieFile  = "movie.csv"

val movieDf = spark.read.option("header", "true").option("inferSchema", "true").csv(basePath + movieFile)
val metadataParsedDf =
      spark.read.option("inferSchema", "true").json(basePath + "movie_date.json").drop("title").drop("genres")

val updatedMovieDf =
      movieDf
        .join(metadataParsedDf, Seq("movieId"), "left")
        .withColumn(
          "title",
          F.when(F.col("title").rlike("\\(\\d{4}\\)"), F.col("title"))
            .otherwise(F.concat(F.col("title"), F.lit(" ("), F.col("release_date"), F.lit(")")))
        )
        .drop("release_date")


updatedMovieDf.write.mode("overwrite").parquet("hdfs://10.128.0.5:8020/user/suraj/updatedMovies")

movieDf.filter(col("movieId") === 125571).show(truncate = false)
    
updatedMovieDf.filter(col("movieId") === 125571).show(truncate = false)

println("clossing application")

application started ...


Waiting for a Spark session to start...

+-------+------------------------------------+------------------+
|movieId|title                               |genres            |
+-------+------------------------------------+------------------+
|125571 |The Court-Martial of Jackie Robinson|(no genres listed)|
+-------+------------------------------------+------------------+

+-------+-------------------------------------------+------------------+
|movieId|title                                      |genres            |
+-------+-------------------------------------------+------------------+
|125571 |The Court-Martial of Jackie Robinson (1964)|(no genres listed)|
+-------+-------------------------------------------+------------------+

clossing application


basePath = gs://scala-spark-temp/movies/
movieFile = movie.csv
movieDf = [movieId: int, title: string ... 1 more field]
metadataParsedDf = [movieId: bigint, release_date: bigint]
updatedMovieDf = [movieId: int, title: string ... 1 more field]


import org.apache.spark.sql.{Row, SparkSession, functions=>F}


[movieId: int, title: string ... 1 more field]

# Case Study 4: Duplicate Record Removal Pipeline

Objective: Identify and remove duplicate movie records based on movieId and title, saving clean data in Avro format.

Scenario: The movies.csv file in HDFS contains duplicate records that need to be cleaned.

Steps:

Ingestion: Load movies.csv into a Spark DataFrame from HDFS.

Transformation:Use DataFrames to identify duplicates based on movieId and title.
Convert the DataFrame to an RDD to perform custom filtering operations using distinct() on a composite key (movieId, title).

Validation: Count the number of duplicates removed by comparing the record counts before and after transformation.

Storage: Save the cleaned data as Avro files in GCP Cloud Storage.

In [6]:
import org.apache.spark.sql.functions.col

println("starting application ...")

val hdfsPath  = "hdfs://10.128.0.5:8020/user/suraj/movies/"
val gcpPath = "gs://scala-spark-temp/movies/"
val fileName = "movie_duplicate_record.csv"

val gcpMovieDf = spark.read.option("header", "true").option("inferSchema", "true").csv(gcpPath + fileName)

println("writing movies to hdfs")
gcpMovieDf.write.option("header", "true").mode("overwrite").csv(hdfsPath + fileName)

println("reading from the hdfs file")
val movieDf = spark.read.option("header", "true").option("inferSchema", "true").csv(hdfsPath + fileName)

println("creating rdd from the dataframe")
val movieRdd         = movieDf.rdd.map(row => ((row.getAs[Int]("movieId"), row.getAs[String]("title")), row))
val distinctMovieRdd = movieRdd.distinct().map(_._2)

println("clean dataframe from the distinct rdd")
val cleanedMovieDf = spark.createDataFrame(distinctMovieRdd, movieDf.schema)

val originalCount     = movieDf.count()
val cleanedCount      = cleanedMovieDf.count()
val duplicatesRemoved = originalCount - cleanedCount
println(s"Number of duplicates removed: $duplicatesRemoved")

println("before removing duplicates")
val duplicateMovieDf = movieDf.groupBy("movieId", "title").count().filter(col("count") > 1)
duplicateMovieDf.show()

println("after removing duplicates")
cleanedMovieDf.groupBy("movieId", "title").count().filter(col("count") > 1).show()

println("writing to gcp in avro formate")
cleanedMovieDf.write.format("avro").mode("overwrite").save(gcpPath + fileName)

println("clossing application ...")

starting application ...
writing movies to hdfs
reading from the hdfs file
creating rdd from the dataframe
clean dataframe from the distinct rdd
Number of duplicates removed: 10
before removing duplicates
+-------+--------------------+-----+
|movieId|               title|count|
+-------+--------------------+-----+
|   1267|Manchurian Candid...|    2|
|   1807|Cool, Dry Place, ...|    2|
|   2045|Far Off Place, A ...|    2|
|    441|Dazed and Confuse...|    2|
|      1|    Toy Story (1995)|    2|
|     19|Ace Ventura: When...|    2|
|    175|         Kids (1995)|    2|
|   1489|Cats Don't Dance ...|    2|
|    765|         Jack (1996)|    2|
|   1014|    Pollyanna (1960)|    2|
+-------+--------------------+-----+

after removing duplicates
+-------+-----+-----+
|movieId|title|count|
+-------+-----+-----+
+-------+-----+-----+

writing to gcp in avro formate
clossing application ...


lastException = null
hdfsPath = hdfs://10.128.0.5:8020/user/suraj/movies/
gcpPath = gs://scala-spark-temp/movies/
fileName = movie_duplicate_record.csv
gcpMovieDf = [movieId: int, title: string ... 1 more field]
movieDf = [movieId: int, title: string ... 1 more field]
movieRdd = MapPartitionsRDD[120] at map at <console>:41
distinctMovieRdd = MapPartitionsRDD[124] at map at <console>:42
cleanedMovieDf = [movieId: int, title: string ... 1 more field]
originalCount = 27288
cleanedCount = 27278
duplicatesRemoved = 1...


1...

# Case Study 5: Time-Based Data Partitioning for Ratings
Objective: Partition user ratings data by year and save in Parquet format.

Scenario: The ratings.csv file includes a timestamp field that needs to be converted into human-readable years, and the data needs to be stored year-wise.

Steps:

Ingestion: Load ratings.csv as a DataFrame from GCP Cloud Storage.

Transformation:Use DataFrames to convert the timestamp field into a year column.
Convert the DataFrame to an RDD to partition records by year using a key-value pair transformation.

Partitioning: Save RDD partitions as separate Parquet files in HDFS, with the structure /ratings/{year}/ratings.parquet.

Verification: Ensure that each year folder in HDFS contains only the records for that year.

In [7]:
import org.apache.spark.sql.functions.{col, from_unixtime, to_timestamp, year}

val gcpPath = "gs://scala-spark-temp/movies/"
val ratingFile = "rating.csv"
val hdfsPath  = "hdfs://10.128.0.5:8020/user/suraj/movies/rating_years/"

println("starting applications ...")


println("reading the rating csv from gcp")
val ratingDf = 
    spark.read.option("header", "true").option("inferSchema", "true").csv(gcpPath + ratingFile)

ratingDf.show(20)

println("converting the timestamp formate to the year")

val ratingWithYearDf = 
    ratingDf.withColumn("year", year(to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))).drop("timestamp")

ratingWithYearDf.show(20)

println("writing the files to the hdfs")
ratingWithYearDf.write.partitionBy("year").mode("overwrite").parquet(hdfsPath + "ratings.parquet")

println("closing the application")

starting applications ...
reading the rating csv from gcp
+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
|     1|    112|   3.5|2004-09-10 03:09:00|
|     1|    151|   4.0|2004-09-10 03:08:54|
|     1|    223|   4.0|2005-04-02 23:46:13|
|     1|    253|   4.0|2005-04-02 23:35:40|
|     1|    260|   4.0|2005-04-02 23:33:46|
|     1|    293|   4.0|2005-04-02 23:31:43|
|     1|    296|   4.0|2005-04-02 23:32:47|
|     1|    318|   4.0|2005-04-02 23:33:18|
|     1|    337|   3.5|2004-09-10 03:08:29|
|     1|    367|   3.5|2005-04-02 23:53:00|
|     1|    541|   4.0|2005-04-02 23:30:03|
|     1|    589|   3.5|2005-04-02 23:45:57|
|     1|    593|   3.5|2005-04-02 23:31:01|
|     1|    653|  

gcpPath = gs://scala-spark-temp/movies/
ratingFile = rating.csv
hdfsPath = hdfs://10.128.0.5:8020/user/suraj/movies/rating_years/
ratingDf = [userId: int, movieId: int ... 2 more fields]
ratingWithYearDf = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]