In [5]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [6]:
spark

### Import Libraries

In [7]:
from time import time
import csv
import os

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.ml.recommendation import ALS,ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.window import Window

### Schema Writing & Data Loading

In [14]:
schema_ratings = StructType([
    StructField("item_id", IntegerType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("rating", FloatType(), False)])


schema_reviews = StructType([
    StructField("item_id", IntegerType(), False),
    StructField("txt", StringType(), False)])


ratings_df = spark.read.json("/user/itv009301/movieDataset/ratings.json", schema=schema_ratings)
metadata_df = spark.read.option('inferschema', 'true').json("/user/itv009301/movieDataset/metadata.json")
reviews_df = spark.read.json("/user/itv009301/movieDataset/reviews.json", schema=schema_reviews)
metadata_updated_df = spark.read.option('inferschema', 'true').json("/user/itv009301/movieDataset/metadata_updated.json")

--------

## Analysing Dataset

* Filter ratings dataframe
>Filter records which are present in both ratings.json and metadata.json

In [15]:
filtered_combined_df = metadata_df.alias("m").join(
    ratings_df.alias("r"),
    metadata_df["item_id"] == ratings_df["item_id"],
    "inner")
filtered_ratings_df = filtered_combined_df.select("r.*")
print("Number of records before filtering: ",ratings_df.count())
print("\nNumber of records after filtering: ",filtered_ratings_df.count())

#Load the model
modelPath = 
model = ALSModel.load(modelPath)

>Filtering values, keeping movies for which we have metadata and ratings both.
>Save filtered metadata for Front End, saved with name `metadata_w_ratings_present.json`

In [7]:
joined_df = metadata_df.alias("m").join(
    ratings_df.alias("r"),
    metadata_df["item_id"] == ratings_df["item_id"],
    "inner").select("m.*")

joined_df_distinct = joined_df.dropDuplicates()

In [8]:
joined_df_distinct.coalesce(1).write.json('/user/itv009301/movieDataset/metadata_w_ratings_present.json')

In [9]:
metadata_w_ratings_present_df = spark.read.\
    option('inferschema', 'true').json("/user/itv009301/movieDataset/metadata_w_ratings_present.json")

print('metadata_df Number of rows before filtering: ',metadata_df.count(),'\nmetadata_df Number of rows after filtering:',\
        metadata_w_ratings_present_df.count())

In [10]:
ratings_df.where(ratings_df.item_id==41052)

In [11]:
metadata_df.where(metadata_df.item_id==41052)

* It Seems evident that there is mismatch between movies ratings present in metadata.json and ratings.json because (66,959 < 84,661), so we will keep those movies which are present in both ratings.json and metadata.json
>* For example, item_id = 41052 is present in ratings.json file but not in metadata.json


#### Rating Scale

In [11]:
min_max_ratings = ratings_df.agg(min("rating").alias("min_rating"), max("rating").alias("max_rating")).collect()

# Extract the results
min_rating = min_max_ratings[0]["min_rating"]
max_rating = min_max_ratings[0]["max_rating"]

print(f"Minimum Rating: {min_rating}")
print(f"Maximum Rating: {max_rating}")

In [12]:
unique_ratings = ratings_df.select("rating").distinct().collect()

# Extract the unique ratings
unique_ratings_values = [row["rating"] for row in unique_ratings]

print("Unique Ratings:", unique_ratings_values)

#### User_id Range

In [13]:
min_max_user_id = ratings_df.agg(min("user_id").alias("lowest_user_id"), max("user_id").alias("highest_user_id"))

min_max_user_id.show()

# Extract the results
min_user_id = min_max_user_id[0]["lowest_user_id"]
max_user_id = min_max_user_id[0]["highest_user_id"]

print(f"Minimum User ID: {min_user_id}")
print(f"Maximum User ID: {max_user_id}")

> Range of user_id is between 0 and 999,999

#### Number of Movies in the dataset

In [14]:
n_movies_df = ratings_df.select('item_id').distinct()
n_movies_df.count()

> There are __67,873__ different movies

#### Number of Users 

In [15]:
n_users_df = ratings_df.select('user_id').distinct()
n_users_df.count()

>Number of unique users in the dataset are __247,383__

> Different user_ids

|user_id|
| --- | 
| 139379|
| 440722|
| 761354|
| 882672|
| 493421|
| 992789|
| 552403|
| 254145|
| 645332|
| 538708|
| 891619|
| 700445|
| 958971|
| 133577|
| 520983|
| 994660|
| 396707|
| 146411|
| 960415|
| 110682|

### Get popular movie names

In [20]:
top_most_voted_movies = filtered_ratings_df.groupBy('item_id').agg({'rating': 'count'})
# Join with metadata_df and select the necessary columns
top_most_voted_movies = metadata_df.alias("m").join(
    top_most_voted_movies.alias("r"),
    col("m.item_id") == col("r.item_id"),
    "inner"
).select("m.*", col("r.count(rating)").alias("votes"))

# Create a Window specification to define the ranking order
windowSpec = Window.orderBy(col("votes").desc())

# Add a new column "popularity_rank" with the popularity rank
top_most_voted_movies = top_most_voted_movies.withColumn("popularity_rank", rank().over(windowSpec))
top_most_voted_movies = top_most_voted_movies.drop('dateAdded')

# # Order by the count of ratings in descending order
# top_most_voted_movies = top_most_voted_movies = top_most_voted_movies.orderBy(col("votes").desc()).limit(1000)
top_most_voted_movies.show()
top_most_voted_movies.coalesce(1).write.json('/user/itv009301/movieDataset/metadata_with_popularity_rank.json')

+---------+--------------------+-------+-------+--------------------+--------------------+-----+---------------+
|avgRating|          directedBy| imdbId|item_id|            starring|               title|votes|popularity_rank|
+---------+--------------------+-------+-------+--------------------+--------------------+-----+---------------+
|  4.41985|      Frank Darabont|0111161|    318|Tim Robbins, Morg...|Shawshank Redempt...|98967|              1|
|  4.06633|     Robert Zemeckis|0109830|    356|Tom Hanks, Gary S...| Forrest Gump (1994)|97772|              2|
|   4.1876|   Quentin Tarantino|0110912|    296|John Travolta, Sa...| Pulp Fiction (1994)|93156|              3|
|  4.15014|      Jonathan Demme|0102926|    593|Jodie Foster, Ant...|Silence of the La...|88573|              4|
|  4.15952|Andy Wachowski, L...|0133093|   2571|Laurence Fishburn...|  Matrix, The (1999)|85431|              5|
|  4.10145|        George Lucas|0076759|    260|Mark Hamill, Harr...|Star Wars: Episod...|82450|


|item_id|count(rating)|title_count     |
|------|------|---------|
|318    |98967        |Shawshank Redemption, The (1994)                                              |
|356    |97772        |Forrest Gump (1994)                                                           |
|296    |93156        |Pulp Fiction (1994)                                                           |
|593    |88573        |Silence of the Lambs, The (1991)                                              |
|2571   |85431        |Matrix, The (1999)                                                            |
|260    |82450        |Star Wars: Episode IV - A New Hope (1977)                                     |
|480    |76792        |Jurassic Park (1993)                                                          |
|527    |72143        |Schindler's List (1993)                                                       |
|110    |69190        |Braveheart (1995)                                                             |
|1      |68884        |Toy Story (1995)                                                              |
|2959   |66538        |Fight Club (1999)                                                             |
|1210   |66516        |Star Wars: Episode VI - Return of the Jedi (1983)                             |
|1196   |66449        |Star Wars: Episode V - The Empire Strikes Back (1980)                         |
|589    |64661        |Terminator 2: Judgment Day (1991)                                             |
|1198   |64069        |Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)|
|50     |62749        |Usual Suspects, The (1995)                                                    |
|4993   |62701        |Lord of the Rings: The Fellowship of the Ring, The (2001)                     |
|858    |61565        |Godfather, The (1972)                                                         |
|2858   |61367        |American Beauty (1999)                                                        |
|780    |59271        |Independence Day (a.k.a. ID4) (1996)                                          |

#### Find movie names from item_id

In [18]:
item = 5349
metadata_df.where(metadata_df.item_id==item)

---------

### Model Training using Parameter Grid Search

In [20]:
<!-- # time  -->
t0 = time()
print("Model training using Grid Search in process.....")

<!-- # Initialize the ALS model -->
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", \
          coldStartStrategy="drop", nonnegative=True)

<!-- # Tune model using ParamGridBuilder -->
param_grid = ParamGridBuilder()\
                .addGrid(als.rank, [12,13,14])\
                .addGrid(als.maxIter, [18,19,20])\
                .addGrid(als.regParam, [.17,.18,.19])\
                .build()
#Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", \
                                predictionCol="prediction")

#Build cross validation usin TrainValidationSplit
tvs = TrainValidationSplit(estimator=als,\
                          estimatorParamMaps=param_grid,\
                          evaluator=evaluator)

#Fit ALS model to training data
model = tvs.fit(training_df)

#Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

#Generate predictions and evaluate using RMSE
predictions = best_model.transform(testing_df)
rmse = evaluator.evaluate(predictions)

#print evaluation metrics and model parameters
print("RMSE = " + str(rmse))
print("**Best Model**")
print(" Rank:", best_model.rank)
print(" MaxIter:", best_model.__java_obj.parent().getMaxIter())
print(" RegParam:", best_model.__java_obj.parent().getRegParam())

print("Grid Search model trained in seconds", time()-t0)

In [22]:
spark.stop()