In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession,SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import *
import numpy as np
from IPython.display import Image
from IPython.display import display
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



In [3]:
spark = SparkSession.builder.appName("Recommendation_System").getOrCreate()
sc=spark.sparkContext
sqlContext = SQLContext(sc)



In [4]:
ratings_df=spark.read.csv('ratings.csv',header=True,inferSchema=True)
movies_df=spark.read.csv('movies.csv',header=True,inferSchema=True)
links_df=spark.read.csv('links.csv',header=True,inferSchema=True)

In [5]:
ratings_df.show(5)
movies_df.show(5)
links_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5

In [6]:
## Ratings split

train,test = ratings_df.randomSplit([0.85,0.15])

In [7]:
iterations = 10
reg_param = 0.1
rank = 4
errors=[]
err = 0

Using for loop to find the model with lowest RMSE

In [8]:
for rank in range(4,7):
    als = ALS(maxIter = iterations,regParam = reg_param,rank = 5,userCol = "userId",itemCol = "movieId",ratingCol="rating")
    model = als.fit(train)
    predictions = model.transform(test)
    new_predictions = predictions.filter(col('prediction')!=np.nan)
    eval = RegressionEvaluator(metricName = "rmse",labelCol="rating",predictionCol="prediction")
    rmse = eval.evaluate(new_predictions)
    print("RMSE = "+str(rmse))
        

RMSE = 0.8735744463773722
RMSE = 0.8735744463773722
RMSE = 0.8735744463773722


Using Cross Validator we can find the best model

In [9]:
ls = ALS(maxIter = iterations,regParam = reg_param,rank = rank,userCol = "userId",itemCol = "movieId",ratingCol="rating")
paramGrid = ParamGridBuilder() \
            .addGrid(als.regParam,[0.1,0.01,0.15]) \
            .addGrid(als.rank,[4,5,6]) \
            .build()
eval = RegressionEvaluator(metricName = "rmse",labelCol="rating",predictionCol="prediction")
crossval = CrossValidator(estimator = als,estimatorParamMaps = paramGrid,evaluator=eval,numFolds=5)
cvModel = crossval.fit(train)

In [10]:
predictions.show(n=10)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|    362|   5.0|964982588| 4.2322702|
|     1|    101|   5.0|964980868|   4.32193|
|     1|   1208|   4.0|964983250|  4.916703|
|     1|    333|   5.0|964981179| 4.2505684|
|     1|      1|   4.0|964982703|  4.796311|
|     1|    736|   3.0|964982653| 3.6984885|
|     1|   1049|   5.0|964982400|  3.483241|
|     1|     70|   3.0|964982400|  3.849427|
|     1|    592|   4.0|964982271|  4.111782|
|     1|    110|   4.0|964982176|  4.837343|
+------+-------+------+---------+----------+
only showing top 10 rows



In [11]:
predictions.join(movies_df,"movieId").show(5)

+-------+------+------+----------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|prediction|               title|              genres|
+-------+------+------+----------+----------+--------------------+--------------------+
|   1959|   108|   5.0|1042840682| 4.1751685|Out of Africa (1985)|       Drama|Romance|
|   1580|    34|   2.5|1162048827| 3.3884459|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   1088|   159|   4.0|1508641161|  3.044857|Dirty Dancing (1987)|Drama|Musical|Rom...|
|   1580|   606|   2.5|1171310310| 3.2915862|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   1829|   606|   3.5|1171737562|  2.202891|  Chinese Box (1997)|       Drama|Romance|
+-------+------+------+----------+----------+--------------------+--------------------+
only showing top 5 rows



In [12]:
user = int(input("Enter User ID: "))

Enter User ID: 24


In [13]:
for_one = predictions.filter(col("userId")==(user)).join(movies_df,"movieId").join(links_df,"movieId")

In [14]:
import webbrowser
link = "https://www.themoviedb.org/movie/"
for m in for_one.take(2):
    URL = link+str(m.tmdbId)
    print(m.title)
    webbrowser.open(URL)

Interview with the Vampire: The Vampire Chronicles (1994)
Mr. & Mrs. Smith (2005)


##### Finding 5 recommended movies for a set of users

In [15]:
users = [100,200,300,400,500,450,350,250,150,50]  #You can input or take any userId

In [16]:
predictions = predictions.na.drop()

In [17]:
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   108|   1959|   5.0|1042840682| 4.1751685|
|    34|   1580|   2.5|1162048827| 3.3884459|
|   159|   1088|   4.0|1508641161|  3.044857|
|   606|   1580|   2.5|1171310310| 3.2915862|
|   606|   1829|   3.5|1171737562|  2.202891|
|    91|   1580|   3.5|1112711168| 3.2713466|
|    91|   1645|   3.0|1112712216|  3.397581|
|   409|   3175|   4.0| 968978236| 3.6668818|
|   230|   1580|   3.5|1196304359| 3.0543618|
|   233|   1580|   3.0|1529334057| 2.9665904|
|   367|   3175|   4.0| 997812532| 3.7701318|
|   599|    471|   2.5|1498518822| 2.6938756|
|   599|   3175|   3.0|1498522138| 2.8536515|
|   599|   6620|   2.5|1498517548|  3.343285|
|   111|   1088|   3.0|1516153967| 2.9092774|
|   111|  44022|   3.0|1516143982| 3.1531286|
|   140|   1580|   3.0|1024051464| 3.5475097|
|   177|  44022|   2.5|1435525303| 3.2574139|
|   416|   1580|   2.0|1187496482|

In [18]:
df2 = predictions.join(movies_df,"movieId")

In [19]:
df2.show(7)

+-------+------+------+----------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|prediction|               title|              genres|
+-------+------+------+----------+----------+--------------------+--------------------+
|   1959|   108|   5.0|1042840682| 4.1751685|Out of Africa (1985)|       Drama|Romance|
|   1580|    34|   2.5|1162048827| 3.3884459|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   1088|   159|   4.0|1508641161|  3.044857|Dirty Dancing (1987)|Drama|Musical|Rom...|
|   1580|   606|   2.5|1171310310| 3.2915862|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   1829|   606|   3.5|1171737562|  2.202891|  Chinese Box (1997)|       Drama|Romance|
|   1580|    91|   3.5|1112711168| 3.2713466|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   1645|    91|   3.0|1112712216|  3.397581|The Devil's Advoc...|Drama|Mystery|Thr...|
+-------+------+------+----------+----------+--------------------+--------------------+
only showing top 7 rows



In [20]:
from pyspark.sql.window import Window
df3 = df2.filter(df2.userId.isin(users))
df3 = df3.orderBy("prediction",ascending=False)
window = Window.partitionBy(df3['userId']).orderBy(df3['prediction'].desc())

df3 = df3.withColumn("row",row_number().over(window)).filter(col("row")<=5).drop("row")

In [21]:
df4 = df3.groupBy('userId').agg(collect_list('title').alias('predicted_movies'))

In [22]:
df4.show()

+------+--------------------+
|userId|    predicted_movies|
+------+--------------------+
|    50|[Godfather, The (...|
|   100|[Say Anything... ...|
|   150|[Leaving Las Vega...|
|   200|[Dark Knight, The...|
|   250|[Sound of Music, ...|
|   300|[City of God (Cid...|
|   350|[Star Wars: Episo...|
|   400|[Silence of the L...|
|   450|[Jaws (1975), Nor...|
|   500|[Clockwatchers (1...|
+------+--------------------+

