In [1]:
import org.apache.spark.sql.SparkSession
import scala.util.Random

In [2]:
var spark = SparkSession.builder().appName("Enrich incomplete movie metadata using additional JSON files.").master("local[*]").getOrCreate()

spark = org.apache.spark.sql.SparkSession@1929384c


org.apache.spark.sql.SparkSession@1929384c

In [3]:
val sprk_ctx = spark.sparkContext

sprk_ctx = org.apache.spark.SparkContext@295b2756


org.apache.spark.SparkContext@295b2756

In [4]:
val movie_gcs_path = "gs://artifacts_spark/movie.csv"

val movie_df = spark.read.format("csv").option("header", true).option("inferSchema", true).load(movie_gcs_path)

movie_gcs_path = gs://artifacts_spark/movie.csv
movie_df = [movieId: int, title: string ... 1 more field]


[movieId: int, title: string ... 1 more field]

In [5]:
movie_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



#### Metadata Generation

In [6]:
val moviesRdd = movie_df.rdd.map {row =>
    val movieId = row.getAs[Int]("movieId")
    val title = row.getAs[String]("title")
    val genres = row.getAs[String]("genres")
    
    (movieId, title, genres)
}

moviesRdd = MapPartitionsRDD[19] at map at <console>:26


MapPartitionsRDD[19] at map at <console>:26

In [7]:
def extractYear(movieId:Int, title:String):(Int, Int) = {
    val indexOfOpenParenthesis = title.lastIndexOf("(")
    val indexOfCloseParenthesis = title.lastIndexOf(")")
    val randomYear = 1950 + Random.nextInt(2030-1950+1).toInt
    if (indexOfOpenParenthesis != -1 && indexOfCloseParenthesis != -1){
        val existingYear = title.substring(indexOfOpenParenthesis+1, indexOfCloseParenthesis)
        if (existingYear.length == 4) {
            (movieId, existingYear.toInt)
        }
        else {
            (movieId, randomYear)
        }
    }else{
        (movieId, randomYear)
    }
}

extractYear: (movieId: Int, title: String)(Int, Int)


In [8]:
val dataRdd = moviesRdd.map(record => { 
        val title = record._2
        val movieId = record._1
        val genre = record._3
        extractYear(movieId, title)
     })

dataRdd = MapPartitionsRDD[20] at map at <console>:27


MapPartitionsRDD[20] at map at <console>:27

In [9]:
moviesRdd.take(5).foreach(println)

(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
(3,Grumpier Old Men (1995),Comedy|Romance)
(4,Waiting to Exhale (1995),Comedy|Drama|Romance)
(5,Father of the Bride Part II (1995),Comedy)


In [10]:
val movieMetadataDF = dataRdd.toDF("movieId", "releaseYear")

movieMetadataDF = [movieId: int, releaseYear: int]


[movieId: int, releaseYear: int]

In [11]:
val target_storage_path = "gs://artifacts_spark/movie_metadata"

movieMetadataDF.coalesce(1).write.mode("overwrite").json(target_storage_path)

target_storage_path = gs://artifacts_spark/movie_metadata


gs://artifacts_spark/movie_metadata

#### Load the json and map the missing release year to the Movie 

In [12]:
val movieMetadataJson = spark.read.format("json").option("header", true).option("inferSchema", true).load(target_storage_path)

movieMetadataJson = [movieId: bigint, releaseYear: bigint]


[movieId: bigint, releaseYear: bigint]

In [13]:
val joinedDf = movie_df.join(movieMetadataJson, "movieId")

joinedDf = [movieId: int, title: string ... 2 more fields]


[movieId: int, title: string ... 2 more fields]

In [14]:
def checkYearExists(title: String):Boolean = { 
        val leftParenIndex = title.lastIndexOf("(")
        val rightParenIndex = title.lastIndexOf(")")
    
        if (leftParenIndex != -1 && rightParenIndex != -1) {
            val extractYear = title.substring(leftParenIndex+1, rightParenIndex)
            if (extractYear.length == 4) {
                true
            } else {
                false
            }
        }else{
            false
        }
    }

checkYearExists: (title: String)Boolean


In [15]:
val structuredData = joinedDf.rdd.map(records => { 
                    val movieId = records.get(0).toString.toInt
                    val title = records.get(1).toString
                    val genre = records.get(2).toString
                    val releaseYear = records.get(3).toString.toInt
                    val checkYear = checkYearExists(title)
                    if (!checkYear) {
                        (movieId, s"${title} (${releaseYear})", genre, releaseYear)
                    } else {
                        (movieId, title, genre, releaseYear)
                    }
                })

structuredData = MapPartitionsRDD[39] at map at <console>:27


MapPartitionsRDD[39] at map at <console>:27

In [16]:
val structuredDf = structuredData.toDF("movieId", "title", "genre", "releaseYear")

structuredDf = [movieId: int, title: string ... 2 more fields]


[movieId: int, title: string ... 2 more fields]

In [17]:
structuredDf.select("movieId").count()

27278

In [20]:
structuredDf.select("releaseYear").count()

27278

In [18]:
structuredDf.write.format("parquet").mode("overwrite").save("hdfs:///user/mano/enriched_movie_data.parquet")

In [21]:
spark.stop()