In [2]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 49.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=8633c45a4f8fd6ae7fc11ffa1b4d6a6e6b40fa177e015f1269d10472a226a011
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:

import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
import numpy as np

In [0]:
directory = '/content/drive/My Drive/datasets/movie_lens'

In [0]:
spark = SparkSession.builder.appName('MovieRecommendor').getOrCreate()

In [0]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [5]:
rating_df = spark.read.csv('/content/drive/My Drive/datasets/movie_lens/ratings.csv',inferSchema=True,header=True)
rating_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [6]:
rating_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [7]:
movie_df = spark.read.csv('/content/drive/My Drive/datasets/movie_lens/movies.csv',inferSchema=True,header=True)
movie_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [8]:
link_df = spark.read.csv('/content/drive/My Drive/datasets/movie_lens/links.csv',inferSchema=True,header=True)
link_df.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [0]:
train_df, val_df = rating_df.randomSplit([0.8,0.2])

In [0]:
#parameter init
iterations = 13
reg_param = 0.1 #lambda
rank = 5 #rank matrix
errors = []
err = 0 

In [19]:
#model build
als = ALS(rank=rank,maxIter=iterations,regParam=reg_param,userCol='userId',itemCol='movieId',ratingCol='rating')
model  = als.fit(train_df)
pred = model.transform(val_df)
new_pred = pred.filter(col('prediction')!=np.nan)
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
rmse = evaluator.evaluate(new_pred)
print(rmse)

0.8847065675874617


In [27]:
  ###in case you get error for java, here is the code
import os #importing os to set environment variable
def install_java():
  # !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null #install openjdk
  # os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #set environment variable
  # !java -version #check java version
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  !update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
  !java -version
install_java()

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)


In [20]:
prediction = model.transform(val_df)
prediction.show(5)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|    91|    471|   1.0|1112713817| 2.6397994|
|   409|    471|   3.0| 967912821| 4.4016886|
|    57|    471|   3.0| 969753604| 3.6384692|
|   387|    471|   3.0|1139047519| 3.1156638|
|   610|    471|   4.0|1479544381| 3.4603376|
+------+-------+------+----------+----------+
only showing top 5 rows



Hodel is not that good but we can proceed for further implementation

In [22]:
prediction.join(movie_df,'movieId').select('userId','title','genres','prediction').show(10)

+------+--------------------+------+----------+
|userId|               title|genres|prediction|
+------+--------------------+------+----------+
|    91|Hudsucker Proxy, ...|Comedy| 2.6397994|
|   409|Hudsucker Proxy, ...|Comedy| 4.4016886|
|    57|Hudsucker Proxy, ...|Comedy| 3.6384692|
|   387|Hudsucker Proxy, ...|Comedy| 3.1156638|
|   610|Hudsucker Proxy, ...|Comedy| 3.4603376|
|   217|Hudsucker Proxy, ...|Comedy| 2.9657764|
|   176|Hudsucker Proxy, ...|Comedy|  3.712875|
|   448|Hudsucker Proxy, ...|Comedy| 3.5359695|
|   411|Hudsucker Proxy, ...|Comedy| 3.2569504|
|   357|Hudsucker Proxy, ...|Comedy| 3.8796024|
+------+--------------------+------+----------+
only showing top 10 rows



In [27]:
#for one user let's predict
one_user = prediction.filter(col('userId')==599).join(movie_df,'movieId').join(link_df,'movieId').select('userId','title','tmdbId','genres','prediction').show(5)

+------+--------------------+------+--------------------+----------+
|userId|               title|tmdbId|              genres|prediction|
+------+--------------------+------+--------------------+----------+
|   599|High School High ...|  9308|              Comedy| 1.5417631|
|   599|Land Before Time,...| 12144|Adventure|Animati...| 2.6928566|
|   599|Just One of the G...| 24548|              Comedy| 2.5960145|
|   599|Before Sunset (2004)|    80|       Drama|Romance| 2.8471794|
|   599|           Go (1999)|  9430|        Comedy|Crime|  3.274642|
+------+--------------------+------+--------------------+----------+
only showing top 5 rows



In [49]:
import webbrowser
link = 'https://www.themoviedb.org/movie/'
for movie in prediction.filter(col('userId')==599).join(movie_df,'movieId').join(link_df,'movieId').select('userId','title','tmdbId','genres','prediction').take(2):
  # print(movie[2])
  movieURL = link+str(movie[2])
  print(movie.title)
  webbrowser.open(movieURL)

High School High (1996)
Land Before Time, The (1988)


In [0]:
user_recom = model.recommendForAllUsers(4)
#book recommendation for all users (4 user each)
movie_recom = model.recommendForAllItems(4)

In [74]:
user_recom.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [75]:
user_recom.select('userId','recommendations.movieId').show(4,False)

+------+----------------------------+
|userId|movieId                     |
+------+----------------------------+
|471   |[87234, 89904, 96004, 68945]|
|463   |[7842, 33649, 5075, 59018]  |
|496   |[213, 3567, 51931, 4495]    |
|148   |[87234, 67695, 42730, 6732] |
+------+----------------------------+
only showing top 4 rows



In [55]:
movie_recom.select('movieId','recommendations.userId').show(4,False)

+-------+--------------------+
|movieId|userId              |
+-------+--------------------+
|1580   |[53, 543, 276, 43]  |
|4900   |[43, 53, 543, 267]  |
|5300   |[191, 296, 154, 418]|
|6620   |[236, 295, 191, 494]|
+-------+--------------------+
only showing top 4 rows



In [0]:
users = rating_df.select('userId').distinct().limit(2)

In [63]:
userSubsetRecs = model.recommendForUserSubset(users, 5)
userSubsetRecs.show(5,False)

+------+--------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                   |
+------+--------------------------------------------------------------------------------------------------+
|463   |[[7842, 5.4397016], [33649, 5.1350226], [5075, 5.116223], [59018, 5.0812526], [183897, 5.066984]] |
|148   |[[87234, 5.1113057], [67695, 4.930197], [42730, 4.9271736], [6732, 4.7600923], [33649, 4.6359415]]|
+------+--------------------------------------------------------------------------------------------------+



In [76]:
userSubsetRecs.select('userId','recommendations.MovieId').show(5,False)

+------+----------------------------------+
|userId|MovieId                           |
+------+----------------------------------+
|463   |[7842, 33649, 5075, 59018, 183897]|
|148   |[87234, 67695, 42730, 6732, 33649]|
+------+----------------------------------+



So this is another example of reccommenation implimentation