# === MovieLens Data Pipeline Project ===

#### == Importing time module for printing execution time of the script

In [1]:
import time
start_time = time.time()

#### == Importing findspark module to integrate pyspark with jupyter notebook 

In [2]:
import findspark
findspark.init()

#### == Create SparkSession for spark application "MovieLens Data Pipeline Project" 

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("MovieLens Data Pipeline Project").getOrCreate()

#### == Import all the necessary libraries

In [4]:

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType

from pyspark.sql.functions import regexp_extract, explode, split, trim, broadcast, avg, count, col, desc

#### == Reading "movies.dat" in pyspark dataframe "movies_df"

In [5]:
# Create custom schema for "movies_df"
movies_schema = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("Title", StringType(), True),
    StructField("Genres", StringType(), True)
])

# Create "movies_df" dataframe by reading data from file "movies.dat"
movies_df = spark.read.format('csv') \
                    .option("sep", "::") \
                    .schema(movies_schema) \
                    .option("mode", "permissive") \
                    .load(r"..\..\staging\dimension\movies.dat")

In [6]:
# Extract "Release"
year_pattern = r"\((\d{4})\)"
movies_df = movies_df.withColumn("ReleaseYear", regexp_extract("Title", year_pattern, 1).cast(IntegerType())) \
                     .withColumn("Title", trim(col("Title")))

# As per the condition mentioned in the requirement we are just taking movies which are released after "1989"
final_movies_df = movies_df.filter(col("ReleaseYear") > 1989)

# movies_df.printSchema()
# movies_df.show(5,truncate=False)
movies_df.count()

3883

#### == Reading "users.dat" in pyspark dataframe "users_df"

In [7]:
# Create custom schema for users_df
users_schema = StructType([
    StructField("UserID", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Occupation", IntegerType(), True),
    StructField("Zip-code", IntegerType(), True)
])

# Create users_df dataframe by reading data from file users.dat
users_df = spark.read.format('csv') \
                    .option("sep", "::") \
                    .schema(users_schema) \
                    .option("mode", "permissive") \
                    .load(r"..\..\staging\dimension\users.dat")

In [8]:
# As per the condition mentioned in the requirement we are taking users who falls under the Age range of (18 - 49)
users_df = users_df.filter((col("Age") >= 18) & (col("Age") <= 49 ) )

# As "Zip-code" column is not required in our further transformation but but we are filling the "nulls" with constant '1'
users_df = users_df.fillna({"Zip-code": 1}) 

# users_df.printSchema()
# users_df.show(5)
users_df.count()

4942

#### == Reading "ratings.dat" in pyspark dataframe "ratings_df"

In [9]:
# Create custom schema for ratings_df
ratings_schema = StructType([
    StructField("UserID", IntegerType(), True),
    StructField("MovieID", IntegerType(), True),
    StructField("Rating", IntegerType(), True),
    StructField("Timestamp", StringType(), True)
])

# Create ratings_df dataframe by reading data from file ratings.dat
ratings_df = spark.read.format('csv') \
                    .option("sep", "::") \
                    .schema(ratings_schema) \
                    .option("mode", "permissive") \
                    .load(r"..\..\staging\fact\ratings.dat")

In [10]:
# Lets change the dataType of Rating column from IntegerType to FloatType
ratings_df = ratings_df.withColumn("Rating", ratings_df.Rating.cast(FloatType()))

# ratings_df.printSchema()
# ratings_df.show(5)
ratings_df.count()

1000209

#### == Join all three "movie_df, users_df, ratings_df" DataFrames  

In [11]:
# Perform join operation, as movie_df and users_df are the small dataframes so we are using bradcast join
join_df = ratings_df.join(broadcast(movies_df), "MovieID").join(broadcast(users_df), "UserID")

# Genres column have multiple genres for one single movie and which is "|" separated so we need to explode it
join_df = join_df.withColumn("Genres", explode(split(col("Genres"), "\\|")))

# join_df.printSchema()
# join_df.show()
join_df.count()

1814382

#### == Calculating the average ratings, per genre and year.

In [12]:
# Calculating the average rating, per Genre and Year
rating_report = join_df.groupBy("Genres", "ReleaseYear") \
                  .agg(avg("Rating").alias("AverageRating"), count("Genres").alias("RatingCountPerGenre")) \
                  .orderBy(col("ReleaseYear").desc())

rating_report.printSchema()
rating_report.show()
rating_report.count()

root
 |-- Genres: string (nullable = false)
 |-- ReleaseYear: integer (nullable = true)
 |-- AverageRating: double (nullable = true)
 |-- RatingCountPerGenre: long (nullable = false)

+-----------+-----------+------------------+-------------------+
|     Genres|ReleaseYear|     AverageRating|RatingCountPerGenre|
+-----------+-----------+------------------+-------------------+
|     Action|       2000|3.4562582844158305|              10562|
|        War|       2000| 3.720183486238532|               1090|
|Documentary|       2000|              3.56|                375|
|    Fantasy|       2000|2.4508196721311477|                122|
|     Comedy|       2000| 3.393928035982009|              13340|
|     Horror|       2000|2.8681214421252372|               2108|
|  Adventure|       2000|  3.02740408570005|               2007|
| Children's|       2000| 3.279113625648279|               2121|
|    Musical|       2000|3.8636363636363638|                176|
|      Drama|       2000| 3.60761121

895

#### == Writing rating_report dataframe to output folder

In [13]:
# Writing rating_report dataframe to output folder. Here we have partitioned the output data with reference to "Genres" column
filePath = r"..\..\output\Jupyter_report"
format = "orc"
mode = "overwrite"
rating_report.write \
        .format(format) \
        .mode(mode) \
        .partitionBy("Genres") \
        .save(filePath)

# Validating the ouput files
import os
for file in os.listdir(filePath):
    print(file)

._SUCCESS.crc
Genres=Action
Genres=Adventure
Genres=Animation
Genres=Children%27s
Genres=Comedy
Genres=Crime
Genres=Documentary
Genres=Drama
Genres=Fantasy
Genres=Film-Noir
Genres=Horror
Genres=Musical
Genres=Mystery
Genres=Romance
Genres=Sci-Fi
Genres=Thriller
Genres=War
Genres=Western
_SUCCESS


In [17]:
spark.stop()

In [15]:
print("Execution time of 'Movielens_data_pipeline' script is \n --- :%s seconds ---" % (time.time() - start_time))

Execution time of 'Movielens_data_pipeline' script is 
 --- :103.99248504638672 seconds ---
