## ALGO DEEP DIVE:
Alternating Least Square is a matrix factorisation algorithm implemented in Apache Spark ML and built for large-scale collaborative filtering problems.

In [0]:
%pip install recommenders
%pip install datetime

In [0]:
# set the environment path to find Recommenders
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

In [0]:
df = spark.read.format("csv").options(header='true', delimiter = ',').load("abfss://.../mldata/MoviesDataRecommendation/ratings.csv")
df.show()
#import pandas as pd
#data=pd.read_csv("/dbfs/FileStore/df_ratings.csv", header='infer')
## Convert Pandas dataframe to spark DataFrame
#df = spark.createDataFrame(data)


# top k items to recommend
TOP_K = 10 
COL_USER = "userId"
COL_ITEM = "movieId"
COL_RATING = "rating"
COL_PREDICTION = "rating"
COL_TIMESTAMP = "timestamp"

In [0]:
# BinaryType: binary
# BooleanType: boolean
# ByteType: tinyint
# DateType: date
# DecimalType: decimal(10,0)
# DoubleType: double
# FloatType: float
# IntegerType: int
# LongType: bigint
# ShortType: smallint
# StringType: string
# TimestampType: timestamp
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import TimestampType
df = df.withColumn("userId", df["userId"].cast("int"))
df = df.withColumn("movieId", df["movieId"].cast("int"))
df = df.withColumn("rating", df["rating"].cast("double"))
df = df.withColumn("timestamp", df["timestamp"].cast("timestamp"))

In [0]:
''' 1.Random split
Random split simply takes in a data set and outputs the splits of the data, given the split ratios.
'''
train, test = spark_random_split(df, ratio=0.70, seed=123)
print("Size of Train: ",train.cache().count())
print("Size of Validate: ",test.cache().count())

In [0]:
#Modelling
'''
Parameters:
- numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
- rank is the number of latent factors in the model (defaults to 10).
- maxIter is the maximum number of iterations to run (defaults to 10).
- regParam specifies the regularization parameter in ALS (defaults to 1.0).
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
- nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).
'''

header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)
model = als.fit(train)


#Scoring
#In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.

# Get the cross join of all user-item pairs and score them.
users = train.select(COL_USER).distinct()
items = train.select(COL_ITEM).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item) #scoring using the model trained

# Remove seen items.
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train.alias("train"),
    (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
    how='outer'
)

top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
    .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

top_all.cache().count()
top_all.show()

In [0]:
#train.where((train.userId==1) & (train.movieId==1348)).show()
#These are already avaialble in data as rated, so excluded from the top_all. Only predicted values of rating for user/movies not avaialble in training set

In [0]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

In [0]:
# Generate predicted ratings.

#prediction v/s original rating in test dataframe
prediction = model.transform(test)
prediction.cache().show()

rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

