In [77]:
import os
import feather
import pyspark
import logging
import json
from tqdm import tqdm

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.sql.functions import countDistinct,count,isnan,isnull
from pyspark.sql import functions as F

from pyspark.ml.recommendation import ALS,ALSModel
from pyspark.ml.evaluation import RegressionEvaluator

In [78]:
def spark_shape(self):
    return (self.count(),len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

In [79]:
#defining the paths for the MovieLens dataset
PATH = r"C:\Users\Kaumil-Trivedi\Documents\kaumil\datasets\ml-1m\ml-1m"
MODEL_PATH =r"C:\Users\Kaumil-Trivedi\Documents\kaumil\spark_tutorials\spark_framework\saved_models"
HYPERPARAMETERS_PATH = r"C:\Users\Kaumil-Trivedi\Documents\kaumil\spark_tutorials\spark_framework\hyperparameters"
FINAL_RESULTS_PATH = r"C:\Users\Kaumil-Trivedi\Documents\kaumil\spark_tutorials\spark_framework"

#initializing seed
random_seed = 42

In [80]:
#Defining Spark Session
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("Recommender")
         .config("spark.sql.execution.arrow.enabled","true")
         .getOrCreate())

# Reading Data

In [93]:
df_ratings = feather.read_dataframe(os.path.join(PATH,"ratings.feather"))
df_movies = feather.read_dataframe(os.path.join(PATH,"movies.feather"))
df_users = feather.read_dataframe(os.path.join(PATH,"users.feather"))

In [94]:
df_ratings = spark.createDataFrame(df_ratings)
df_movies = spark.createDataFrame(df_movies)
df_users = spark.createDataFrame(df_users)

In [95]:
training_data,testing_data = df_ratings.randomSplit([0.8,0.2],seed=random_seed)

### Ratings  & Movies

In [96]:
df_final_ratings = df_ratings.select(['UserID', 'MovieID', 'Rating'])

In [97]:
df_final_movies = df_movies.select(['MovieID','Title'])

##### Preprocessing Movie Titles

In [11]:
# movie_title_indexer = StringIndexer(inputCol="Title",outputCol="TitleIndex")
# movie_indexer_model = movie_title_indexer.fit(df_final_movies)
# df_final_movies = movie_indexer_model.transform(df_final_movies)

In [64]:
# ratings_assembler = VectorAssembler(inputCols=df_final_ratings.columns,outputCol="rating_features")
# movies_assembler = VectorAssembler(inputCols=['MovieID','TitleIndex'],outputCol="movies_features")

In [65]:
# df_final_ratings = ratings_assembler.transform(df_final_ratings).select("rating_features")
# df_final_movies = movies_assembler.transform(df_final_movies).select("movies_features")

# ML

In [98]:
ratings_train, ratings_test = df_final_ratings.randomSplit([0.8,0.2],seed=random_seed)

### Training

In [99]:
iterations = 10
regularization_parameter = 0.1
ranks = [4,8,12]
errors = [0,0,0]
err_index = 0
coldStartStrategy = "drop"

min_error = float('inf')
best_rank = -1
best_iteration = -1

#Regression Evaluator
reg_eval = RegressionEvaluator(predictionCol="prediction",\
                               labelCol="Rating",\
                               metricName="rmse")

results = {}

In [100]:
def create_json(**kwargs):
    """Accepts a list of keyword parameters and returns a json consisting the name and the value of the parameter"""
    
    try:
        return json.dumps(kwargs)
    
    except Exception as e:
        logging.exception("Exception in creating the json")

In [82]:
def evaluators(predictions,**kwargs):
    """Accepts a PySpark SQL Dataframe, runs all the evaluations on it with the evaluators provided and outputs the results"""
    
    if (str(type(predictions)) != "<class 'pyspark.sql.dataframe.DataFrame'>") or ("prediction" not in predictions.columns):
        raise Exception("Inconsistent dataframe passed for predictions")
    try:
        
        results = {}
        for arg in kwargs:
            evaluator = kwargs[arg]
            results[arg] = {}
            
            results[arg]["metric_value"] = evaluator.evaluate(predictions)
            results[arg]["metric_name"] = evaluator.getMetricName()
        
        return results
    except Exception as ex:
        logging.exception("Exception in calculating the evaluations")
        

In [83]:
def create_and_save_json(data,path,file_name=None,file_path_name_inclusive=True):
    """Accepts data, converts it to JSON and saves it to the path provided"""
    
    try:
        json_file = json.dumps(data)
        file_path = None
        
        #saving it to the path
        if file_path_name_inclusive == True:
            file_path = path
        else:
            if file_name == None:
                raise Exception("Please provide a file name in the file path or as a parameter")
            file_path = os.path.join(path,file_name)
        
        with open(file_path,"w",encoding="utf-8") as JSONFile:
            json.dump(json_file,JSONFile)
    
    except Exception as e:
        logging.Exception("Exception in creating and saving JSON file")

In [84]:
for rank in tqdm(ranks):
    als = ALS(rank=rank,\
              seed=random_seed,\
              regParam=regularization_parameter,\
              maxIter=iterations,\
              coldStartStrategy= coldStartStrategy,\
              userCol='UserID',\
              itemCol='MovieID',\
              ratingCol="Rating")
    model = als.fit(ratings_train)
    
    rank_string = "rank"+str(rank)
    
    #saving the model to a specified path,eliminating the need to retrain the models later
    model.save(os.path.join(MODEL_PATH,rank_string))
    
    #saving the hyperparameters and the error values for this model
    hyperparameter_json = create_json(rank=rank,\
                                      seed=random_seed,\
                                      regularization_parameter=regularization_parameter,\
                                      coldStartStrategy=coldStartStrategy)
    
    evaluators_json = evaluators(test_prediction,regression=reg_eval)
    
    
    test_prediction = model.transform(ratings_test)
    results[rank_string] = {
        "Hyperparameters": hyperparameter_json,
        "Errors": evaluators_json
    }
    
    create_and_save_json(data=hyperparameter_json,path=os.path.join(MODEL_PATH,rank_string,"hyperparameters.json"),file_path_name_inclusive=True)
    create_and_save_json(data=evaluators_json,path=os.path.join(MODEL_PATH,rank_string,"evaluators.json"),file_path_name_inclusive=True)
create_and_save_json(data = results,path=FINAL_RESULTS_PATH,file_name="results.json",file_path_name_inclusive=False)



  0%|                                                                                 | 0/3 [00:00<?, ?it/s]

 33%|████████████████████████▎                                                | 1/3 [00:57<01:54, 57.15s/it]

 67%|████████████████████████████████████████████████▋                        | 2/3 [01:57<00:58, 58.11s/it]

100%|█████████████████████████████████████████████████████████████████████████| 3/3 [03:05<00:00, 61.20s/it]

### Getting Average Rating for each movie

In [102]:
def return_avg(dataframe,groupBy_column:str,target_column:str,alias_name=None):
    
    """Returns a new PySpark Dataframe computing the avg value of the column"""
    
    if alias_name == None:
        alias_name = "avg_"+str(target_column)
    
    return dataframe.groupBy([groupBy_column]).agg(F.mean(target_column).alias(alias_name))

In [103]:
df_avg_movie_ratings = return_avg(df_final_ratings,"MovieID","Rating")

In [104]:
df_final_movies = df_final_movies \
.join(df_avg_movie_ratings,df_final_movies['MovieID']==df_avg_movie_ratings['MovieID'],how='full') \
.select(F.coalesce(df_final_movies['MovieID'],\
                   df_avg_movie_ratings['MovieID']).alias('MovieID'),\
        df_final_movies['Title'],\
        df_avg_movie_ratings['avg_Rating'])

##### Sample User

In [105]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9),
     (0,1,8),
     (0,16,7),
     (0,25,8),
     (0,32,9),
     (0,335,4),
     (0,379,3),
     (0,296,7),
     (0,858,10),
     (0,50,8)
    ]
df_new_user_ratings = spark.createDataFrame(new_user_ratings,['UserID','MovieID','Rating'])
print('New user ratings: %s' % df_new_user_ratings.show(10))

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     0|    260|     9|
|     0|      1|     8|
|     0|     16|     7|
|     0|     25|     8|
|     0|     32|     9|
|     0|    335|     4|
|     0|    379|     3|
|     0|    296|     7|
|     0|    858|    10|
|     0|     50|     8|
+------+-------+------+

New user ratings: None


##### Adding the ratings given by the user to the ratings dataframe

In [106]:
df_final_ratings = df_final_ratings.union(df_new_user_ratings)

##### Training the best model on the new dataset

In [111]:
als = ALS(rank=best_rank,\
          seed=random_seed,\
          regParam=regularization_parameter,\
          maxIter=iterations,\
          userCol='UserID',\
          itemCol='MovieID',\
          ratingCol="Rating",\
          coldStartStrategy="drop")

In [112]:
model = als.fit(df_final_ratings)

In [113]:
model

ALS_d9a6330ce9ae

# Fetching the Movie IDs

In [125]:
movie_ids = df_final_movies.select(['MovieID'])

In [128]:
df_new_user_ratings.show(n=10)

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     0|    260|     9|
|     0|      1|     8|
|     0|     16|     7|
|     0|     25|     8|
|     0|     32|     9|
|     0|    335|     4|
|     0|    379|     3|
|     0|    296|     7|
|     0|    858|    10|
|     0|     50|     8|
+------+-------+------+



In [136]:
movie_ids.filter(movie_ids['MovieID'].isin([1,2,3]) == False).shape()

(3880, 1)

In [140]:
df_new_user_ratings["MovieID"]

Column<b'MovieID'>

In [141]:
df_final_ratings

DataFrame[UserID: bigint, MovieID: bigint, Rating: bigint]