In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk"

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

22/05/30 00:31:05 WARN Utils: Your hostname, kembot resolves to a loopback address: 127.0.1.1; using 10.87.2.208 instead (on interface wlan0)
22/05/30 00:31:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/30 00:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import pandas as pd
from pyspark.sql.functions import col, explode, split
from pyspark import SparkContext

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

In [5]:
def col_filtering(ratings_loc):
    ratings = spark.read.option("inferSchema","true").csv(ratings_loc,header=True)
    ratings = ratings.drop('timestamp')
    
    #(train, test) = ratings.randomSplit([0.9, 0.1], seed = 1234)
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")
    
    param_grid = ParamGridBuilder() \
    .addGrid(als.rank, list(range(5,25,5))) \
            .addGrid(als.regParam, [0.001, 0.01, 0.05, 0.1]) \
            .build()
    
    evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
    print ("Num models to be tested: ", len(param_grid))
    
    cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
    
    model = cv.fit(ratings)
    best_model = model.bestModel
    
    print("**Best Model**")
    print("  Rank:", best_model._java_obj.parent().getRank())
    print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
    print("  RegParam:", best_model._java_obj.parent().getRegParam())
    
    #test_predictions = best_model.transform(test)
    #RMSE = evaluator.evaluate(test_predictions)
    #print("RMSE: ", RMSE)
    
    return best_model

In [8]:
import time
t1 = time.time()
col_filtering_model = col_filtering("dataset/ratings.csv")
t2 = time.time()
print("Time =", t2 - t1)
col_filtering_model

Num models to be tested:  16


22/05/30 00:33:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/30 00:33:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


**Best Model**
  Rank: 20
  MaxIter: 10
  RegParam: 0.1
Time = 201.1196472644806


ALSModel: uid=ALS_845353b640a8, rank=20

In [28]:
def recommend(model, movies_loc):
  recommendations = model.recommendForAllUsers(10)
  nrecommendations = recommendations.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
  movies = spark.read.csv(movies_loc,header=True)
  nrecommendations = nrecommendations.join(movies, on = 'movieId')
  return nrecommendations.toPandas()

In [19]:
nrc = recommend(col_filtering_model, "dataset/movies.csv")
nrc.head()

Unnamed: 0,movieId,userId,rating,title,genres
0,78836,1,5.572335,Enter the Void (2009),Drama
1,171495,1,5.553287,Cosmos,(no genres listed)
2,720,1,5.511459,Wallace & Gromit: The Best of Aardman Animatio...,Adventure|Animation|Comedy
3,33649,1,5.499642,Saving Face (2004),Comedy|Drama|Romance
4,8477,1,5.494569,"Jetée, La (1962)",Romance|Sci-Fi


In [25]:
nrc.to_csv('recs.csv', index = False)

In [34]:
rec = pd.read_csv('recs.csv')

In [44]:
def recommendForUser(nrc, userNo):
    return nrc.loc[nrc['userId'] == userNo][['title', 'genres']]

In [47]:
r100 = recommendForUser(rec, 100)
r100

Unnamed: 0,title,genres
3440,Adam's Rib (1949),Comedy|Romance
3441,Anne of Green Gables: The Sequel (a.k.a. Anne ...,Children|Drama|Romance
3442,Anne of Green Gables (1985),Children|Drama
3443,"Very Potter Sequel, A (2010)",Comedy|Musical
3444,Mr. Skeffington (1944),Drama|Romance
3445,12 Angry Men (1997),Crime|Drama
3446,Beautiful Thing (1996),Drama|Romance
3447,Thief (1981),Crime|Drama|Thriller
3448,Strictly Sexual (2008),Comedy|Drama|Romance
3449,Dear Zachary: A Letter to a Son About His Fath...,Documentary


In [1]:
import pandas as pd
recs = pd.read_csv("recs.csv")

In [10]:
rec100 = recs.loc[recs['userId'] == 100]['title'].reset_index(drop = True).to_dict()
rec100

{0: "Adam's Rib (1949)",
 1: 'Anne of Green Gables: The Sequel (a.k.a. Anne of Avonlea) (1987)',
 2: 'Anne of Green Gables (1985)',
 3: 'Very Potter Sequel, A (2010)',
 4: 'Mr. Skeffington (1944)',
 5: '12 Angry Men (1997)',
 6: 'Beautiful Thing (1996)',
 7: 'Thief (1981)',
 8: 'Strictly Sexual (2008)',
 9: 'Dear Zachary: A Letter to a Son About His Father (2008)'}