In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
from pyspark.sql.window import Window

#### Global Variable Declaration

In [0]:
moviesFile = "dbfs:/FileStore/shared_uploads/karunaishwarya1996@gmail.com/movieLens/u-1.item"
ratingsFile = "dbfs:/FileStore/shared_uploads/karunaishwarya1996@gmail.com/movieLens/u-2.data"
ratings_delta = "dbfs:/FileStore/shared_uploads/karunaishwarya1996@gmail.com/dataLakeML/ratings"
movies_delta = "dbfs:/FileStore/shared_uploads/karunaishwarya1996@gmail.com/dataLakeML/movies"
moviesNgenres = "dbfs:/FileStore/shared_uploads/karunaishwarya1996@gmail.com/dataLakeML/moviesNgenres"
top10movies = "dbfs:/FileStore/shared_uploads/karunaishwarya1996@gmail.com/dataLakeML/top10movies"

#### Data Ingestion
Specifying a schema to maintain consistency across loads and casting to the relevant data types

In [0]:
dataSchema = StructType([
        StructField("userId", LongType(), True),
        StructField("movieId", LongType(), True),
        StructField("rating", IntegerType(), True),
        StructField("ratedDatetime", LongType(), True)])
ratingsData = spark.read.csv(ratingsFile, sep=r'\t', header=False, schema = dataSchema)
ratingsData = ratingsData.withColumn("ratedDatetime",to_timestamp(from_unixtime(col("ratedDatetime"), "yyyy-MM-dd HH:mm:ss")))
display(ratingsData)

userId,movieId,rating,ratedDatetime
196,242,3,1997-12-04T15:55:49.000+0000
186,302,3,1998-04-04T19:22:22.000+0000
22,377,1,1997-11-07T07:18:36.000+0000
244,51,2,1997-11-27T05:02:03.000+0000
166,346,1,1998-02-02T05:33:16.000+0000
298,474,4,1998-01-07T14:20:06.000+0000
115,265,2,1997-12-03T17:51:28.000+0000
253,465,5,1998-04-03T18:34:27.000+0000
305,451,3,1998-02-01T09:20:17.000+0000
6,86,3,1997-12-31T21:16:53.000+0000


In [0]:
movieSchema = StructType([
            StructField("movieId", LongType(), True),
            StructField("movieTitle", StringType(), True),
            StructField("releaseDate", StringType(), True),
            StructField("videoReleaseDate", StringType(), True),
            StructField("IMDBURL", StringType(), True),
            StructField("unknown", IntegerType(), True),
            StructField("Action", IntegerType(), True),
            StructField("Adventure", IntegerType(), True),
            StructField("Animation", IntegerType(), True),
            StructField("Childrens", IntegerType(), True),
            StructField("Comedy", IntegerType(), True),
            StructField("Crime", IntegerType(), True),
            StructField("Documentary", IntegerType(), True),
            StructField("Drama", IntegerType(), True),
            StructField("Fantasy", IntegerType(), True),
            StructField("FilmNoir", IntegerType(), True),
            StructField("Horror", IntegerType(), True),
            StructField("Musical", IntegerType(), True),
            StructField("Mystery", IntegerType(), True),
            StructField("Romance", IntegerType(), True),
            StructField("SciFi", IntegerType(), True),
            StructField("Thriller", IntegerType(), True),
            StructField("War", IntegerType(), True),
            StructField("Western", IntegerType(), True)
            ])
moviesData = spark.read.csv(moviesFile, sep=r'|', header=False, schema = movieSchema)
moviesData = moviesData.withColumn("releaseDate",to_date(col("releaseDate"), "d-MMM-yyyy")).withColumn("videoReleaseDate",to_date(col("videoReleaseDate"), "d-MMM-yyyy"))
display(moviesData)

movieId,movieTitle,releaseDate,videoReleaseDate,IMDBURL,unknown,Action,Adventure,Animation,Childrens,Comedy,Crime,Documentary,Drama,Fantasy,FilmNoir,Horror,Musical,Mystery,Romance,SciFi,Thriller,War,Western
1,Toy Story (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Toy%20Story%20(1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
2,GoldenEye (1995),1995-01-01,,http://us.imdb.com/M/title-exact?GoldenEye%20(1995),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
3,Four Rooms (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995),0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
4,Get Shorty (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995),0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0
5,Copycat (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,0,0
6,Shanghai Triad (Yao a yao yao dao waipo qiao) (1995),1995-01-01,,http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
7,Twelve Monkeys (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0
8,Babe (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Babe%20(1995),0,0,0,0,1,1,0,0,1,0,0,0,0,0,0,0,0,0,0
9,Dead Man Walking (1995),1995-01-01,,http://us.imdb.com/M/title-exact?Dead%20Man%20Walking%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
10,Richard III (1995),1996-01-22,,http://us.imdb.com/M/title-exact?Richard%20III%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0


#### Data Load to Staging

Creating the delta lake if not exists otherwise upsert based on the key columns.

Paritioning the ratings data based on Movie Id since in our transformation layer we will be using this as join condition.

In [0]:
if DeltaTable.isDeltaTable(spark, ratings_delta):
  ratingsTable = DeltaTable.forPath(spark, ratings_delta)
  ratingsTable.alias("oldData").merge(ratingsData.coalesce(1).alias("newData"), "(oldData.userid = newData.userid) AND (oldData.movieid = newData.movieid)") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
  print("Merged into Ratings delta table.")
else:
  ratingsData.write.partitionBy("movieId").format("delta").save(ratings_delta)
  print("Ratings delta table created.")

Merged into Ratings delta table.


In [0]:
if DeltaTable.isDeltaTable(spark, movies_delta):
  moviesTable = DeltaTable.forPath(spark, movies_delta)
  moviesTable.alias("oldData").merge(moviesData.alias("newData"), "(oldData.movieid = newData.movieid)") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
  print("Merged into Movies delta table.")
else:
  moviesData.coalesce(1).write.format("delta").save(movies_delta)
  print("Movies delta table created.")

Merged into Movies delta table.


#### Unpivoting Data to  hold one genre per row

In [0]:
moviesDf = spark.read.format("delta").load(movies_delta)
moviesNgenresDf = moviesDf.selectExpr("movieId", "movieTitle", "releaseDate", "videoReleaseDate", "IMDBURL", "stack(19, 'unknown',unknown,'Action',Action,'Adventure',Adventure,'Animation',Animation,'Childrens',Childrens,'Comedy',Comedy,'Crime',Crime,'Documentary',Documentary,'Drama',Drama,'Fantasy',Fantasy,'FilmNoir',FilmNoir,'Horror',Horror,'Musical',Musical,'Mystery',Mystery,'Romance',Romance,'SciFi',SciFi,'Thriller',Thriller,'War',War,'Western',Western) as (genre, flag)").where("flag == 1").drop("flag")
moviesNgenresDf.write.partitionBy("genre").mode("overwrite").parquet(moviesNgenres)

#### Calculating the top 10 movies based on the Average Ratings
Aggregating the ratings data frame before joining with movies data for optimization

In [0]:
ratingsDf = spark.read.format("delta").load(ratings_delta).selectExpr("movieId","rating")
moviesVsRatings = ratingsDf.groupBy("movieId").agg(count("movieId").alias("count"), avg("rating").alias("avgRating")).where("count >= 5")
w = Window.orderBy(desc('avgRating'))
top10Movies = moviesVsRatings.withColumn('rank', row_number().over(w)).where("rank <= 10").drop("count").drop("rank")
top10Movies = moviesDf.selectExpr("movieId", "movieTitle", "releaseDate", "videoReleaseDate", "IMDBURL").coalesce(1).join(top10Movies, ["movieId"], how = "inner")
top10Movies.coalesce(1).write.mode("overwrite").csv(top10movies)
display(top10Movies)

movieId,movieTitle,releaseDate,videoReleaseDate,IMDBURL,avgRating
1449,Pather Panchali (1955),1996-03-22,,http://us.imdb.com/M/title-exact?Pather%20Panchali%20(1955),4.625
408,"Close Shave, A (1995)",1996-04-28,,"http://us.imdb.com/M/title-exact?Close%20Shave,%20A%20(1995)",4.491071428571429
318,Schindler's List (1993),1993-01-01,,http://us.imdb.com/M/title-exact?Schindler's%20List%20(1993),4.466442953020135
169,"Wrong Trousers, The (1993)",1993-01-01,,"http://us.imdb.com/M/title-exact?Wrong%20Trousers,%20The%20(1993)",4.466101694915254
483,Casablanca (1942),1942-01-01,,http://us.imdb.com/M/title-exact?Casablanca%20(1942),4.45679012345679
114,Wallace & Gromit: The Best of Aardman Animation (1996),1996-04-05,,http://us.imdb.com/Title?Wallace+%26+Gromit%3A+The+Best+of+Aardman+Animation+(1996),4.447761194029851
64,"Shawshank Redemption, The (1994)",1994-01-01,,"http://us.imdb.com/M/title-exact?Shawshank%20Redemption,%20The%20(1994)",4.445229681978798
603,Rear Window (1954),1954-01-01,,http://us.imdb.com/M/title-exact?Rear%20Window%20(1954),4.3875598086124405
12,"Usual Suspects, The (1995)",1995-08-14,,"http://us.imdb.com/M/title-exact?Usual%20Suspects,%20The%20(1995)",4.385767790262173
50,Star Wars (1977),1977-01-01,,http://us.imdb.com/M/title-exact?Star%20Wars%20(1977),4.358490566037736
