<a href="https://colab.research.google.com/github/jhihan/Hybrid-Recommendation-System/blob/master/Recommendation_system_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-preview2-bin-hadoop3.2.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop3.2"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
from google.colab import files
import pandas as pd
files.upload()
# upload movies.csv ratings.csv

Saving ratings.csv to ratings.csv


{'ratings.csv': b'userId,movieId,rating,timestamp\r\n1,1,4.0,964982703\r\n1,3,4.0,964981247\r\n1,6,4.0,964982224\r\n1,47,5.0,964983815\r\n1,50,5.0,964982931\r\n1,70,3.0,964982400\r\n1,101,5.0,964980868\r\n1,110,4.0,964982176\r\n1,151,5.0,964984041\r\n1,157,5.0,964984100\r\n1,163,5.0,964983650\r\n1,216,5.0,964981208\r\n1,223,3.0,964980985\r\n1,231,5.0,964981179\r\n1,235,4.0,964980908\r\n1,260,5.0,964981680\r\n1,296,3.0,964982967\r\n1,316,3.0,964982310\r\n1,333,5.0,964981179\r\n1,349,4.0,964982563\r\n1,356,4.0,964980962\r\n1,362,5.0,964982588\r\n1,367,4.0,964981710\r\n1,423,3.0,964982363\r\n1,441,4.0,964980868\r\n1,457,5.0,964981909\r\n1,480,4.0,964982346\r\n1,500,3.0,964981208\r\n1,527,5.0,964984002\r\n1,543,4.0,964981179\r\n1,552,4.0,964982653\r\n1,553,5.0,964984153\r\n1,590,4.0,964982546\r\n1,592,4.0,964982271\r\n1,593,4.0,964983793\r\n1,596,5.0,964982838\r\n1,608,5.0,964982931\r\n1,648,3.0,964982563\r\n1,661,5.0,964982838\r\n1,673,3.0,964981775\r\n1,733,4.0,964982400\r\n1,736,3.0,964

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import lit

In [0]:
movies = spark.read.csv('movies.csv',inferSchema=True, header =True)
ratings = spark.read.csv('ratings.csv',inferSchema=True, header =True)

Build up the Alternating Least Square (ALS) matrix factorization model in collaborative filtering algorithm

In [0]:
#Split training and testing data
train_data,test_data = ratings.randomSplit([0.8,0.2])

als = ALS(userCol='userId',itemCol='movieId',ratingCol='rating',coldStartStrategy="drop")


paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [1, 0.1, 0.01]) \
    .addGrid(als.rank, [10, 20]) \
    .build()


In [0]:
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mae"),
                          numFolds=3)

In [0]:
cvModel = crossval.fit(train_data)

In [0]:
best_rank = cvModel.bestModel._java_obj.parent().getRank()
best_regParam = cvModel.bestModel._java_obj.parent().getRegParam()
best_model_params = {'rank': best_rank, 'regParam': best_regParam}

In [0]:
pred = cvModel.transform(test_data)

In [0]:
def get_movieId( movie_name, movies_data ):
    """
    return the movieId which is corresponding to the movie name

    Parameters
    ----------
    movie_name: string, the name of the movie w/ or w/o the year

    movies_data: spark Dataframe, movies data with columns 'movieId','title'

    Return
    ------
    the movieId
    """


    movieIds = []
    for movie in movie_name:
      Ids = movies_data.filter(movies_data.title.like('%{}%'.format(movie)) ).select('movieId').collect()
      movieIds = list(set(movieIds + [ row.movieId for row in Ids ]  ))
    return movieIds

In [0]:
def make_recommendation_user_based(best_model_params, ratings_data, movies_data,
                        fav_movie, n_recommendations, userId=-99 ):
    """
    make top n movie recommendations
    Parameters
    ----------

    best_model_params: dict, the best parameters of the model from the CrossValidator

    ratings_data: spark Dataframe, ratings data with columns 'userId', 'movieId', 'rating' 

    movies_data: spark Dataframe, movies data with columns 'movieId','title'

    fav_movie: str, name of user input movie

    n_recommendations: int, top n recommendations

    userId: int optional (default=-99), the user Id
            if userId = -99, the new user will be created
            if userId = -1, the latest inserted user is chosen

    """

    movieIds = get_movieId(fav_movie, movies_data )

    if (userId == -99):
      userId = ratings_data.agg({"userId": "max"}).collect()[0][0] + 1
    elif (userId == -1):
      userId = ratings_data.agg({"userId": "max"}).collect()[0][0]

    max_rating = ratings_data.agg({"rating": "max"}).collect()[0][0]

    # build up the train data, which is the original data + the new inserted data.
    # We assume that the inserted favorate movie has the highest rating.
    train_data = ratings_data
    for movieId in movieIds:
      new_rows = spark.createDataFrame([(userId,movieId,max_rating,0)], ['userId', 'movieId', 'rating', 'timestamp'])
      train_data = ratings_data.union(new_rows)

    # train best ALS
    als = ALS(userCol='userId',itemCol='movieId',ratingCol='rating', \
              rank=best_model_params.get('rank'), \
              regParam=best_model_params.get('regParam'))

    model = als.fit( train_data )
    df_newuser = movies.filter(movies.movieId != movieId).select('movieId').withColumn("userId", lit(userId))

    predictions = model.transform(df_newuser)

    topn_predictions = predictions.orderBy('prediction', ascending=False).limit(n_recommendations)
    topn_ids = topn_predictions.select('userId')
    topn_movies = movies.join( topn_predictions, topn_predictions.movieId == movies.movieId ).orderBy( 'prediction' ).select( 'title' )

    return [row.title for row in topn_movies.collect()]
    

In [87]:
my_favorite_movies = ['Iron Man']

# get recommends
n_recommendations = 10
recommends_user_based = make_recommendation_user_based(best_model_params = best_model_params, ratings_data = ratings, movies_data = movies,
                        fav_movie = my_favorite_movies, n_recommendations = n_recommendations )

print("--------------Search based on similarity between user's preference--------------------------------------")
print('The users like' , my_favorite_movies , 'also like:')
for i, title in enumerate(recommends_user_based):
    print(i+1, title)
if( len(recommends_user_based) < n_recommendations ):
  print("Sadly, we couldn't offer so many recommendations :(")

begin fun
After max_rating 5.0
Before ALS 20 0.1
--------------Search based on similarity between user's preference--------------------------------------
The users like ['Iron Man'] also like:
1 Innocents, The (1961)
2 Niagara (1953)
3 Color of Paradise, The (Rang-e khoda) (1999)
4 Scrooge (1970)
5 Proof (1991)
6 Parallax View, The (1974)
7 Mutiny on the Bounty (1962)
8 In the Realms of the Unreal (2004)
9 Twentieth Century (1934)
10 Browning Version, The (1951)
