##  PySpark setup

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/nul
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()
findspark.find()

'/content/spark-3.1.1-bin-hadoop2.7'

## Download movie lens data:  
https://www.kaggle.com/grouplens/movielens-20m-dataset?select=movie.csv

In [None]:
! pip install -q kaggle

In [None]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"sohamsharangpani","key":"0968454a2b75e23528a728f8d1581a17"}'}

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets list

mkdir: cannot create directory ‘/root/.kaggle’: File exists
ref                                                         title                                              size  lastUpdated          downloadCount  
----------------------------------------------------------  ------------------------------------------------  -----  -------------------  -------------  
gpreda/reddit-vaccine-myths                                 Reddit Vaccine Myths                              223KB  2021-04-07 08:06:45           2179  
crowww/a-large-scale-fish-dataset                           A Large Scale Fish Dataset                          3GB  2021-02-17 16:10:44           1274  
dhruvildave/wikibooks-dataset                               Wikibooks Dataset                                   1GB  2021-02-18 10:08:27            986  
imsparsh/musicnet-dataset                                   MusicNet Dataset                                   22GB  2021-02-18 14:12:19            461  
promptcloud/care

In [None]:
!kaggle datasets download -d grouplens/movielens-20m-dataset

Downloading movielens-20m-dataset.zip to /content
 99% 193M/195M [00:08<00:00, 19.1MB/s]
100% 195M/195M [00:08<00:00, 23.0MB/s]


In [None]:
!unzip /content/movielens-20m-dataset.zip

Archive:  /content/movielens-20m-dataset.zip
  inflating: genome_scores.csv       
  inflating: genome_tags.csv         
  inflating: link.csv                
  inflating: movie.csv               
  inflating: rating.csv              
  inflating: tag.csv                 


# Movie Recommendation using PySpark

In [None]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
from pyspark.sql.functions import col

In [None]:
from pyspark.sql.functions import lit

In [None]:
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommendations').getOrCreate()
movies = spark.read.csv("movie.csv",header=True)
ratings = spark.read.csv("rating.csv",header=True)
ratings.show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
|     1|    112|   3.5|2004-09-10 03:09:00|
|     1|    151|     4|2004-09-10 03:08:54|
|     1|    223|     4|2005-04-02 23:46:13|
|     1|    253|     4|2005-04-02 23:35:40|
|     1|    260|     4|2005-04-02 23:33:46|
|     1|    293|     4|2005-04-02 23:31:43|
|     1|    296|     4|2005-04-02 23:32:47|
|     1|    318|     4|2005-04-02 23:33:18|
|     1|    337|   3.5|2004-09-10 03:08:29|
|     1|    367|   3.5|2005-04-02 23:53:00|
|     1|    541|     4|2005-04-02 23:30:03|
|     1|    589|   3.5|2005-04-02 23:45:57|
|     1|    593|   3.5|2005-04-02 23:31:01|
|     1|    653|     3|2004-09-10 03:08:11|
|     1|    919|   3.5|2004-09-1

In [None]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [None]:
ratings = ratings.\
          withColumn('userId', col('userId').cast('integer')).\
          withColumn('movieId', col('MovieId').cast('integer')).\
          withColumn('rating', col('rating').cast('float')).\
          drop('timestamp')

ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    223|   4.0|
|     1|    253|   4.0|
|     1|    260|   4.0|
|     1|    293|   4.0|
|     1|    296|   4.0|
|     1|    318|   4.0|
|     1|    337|   3.5|
|     1|    367|   3.5|
|     1|    541|   4.0|
|     1|    589|   3.5|
|     1|    593|   3.5|
|     1|    653|   3.0|
|     1|    919|   3.5|
+------+-------+------+
only showing top 20 rows



In [None]:
movie_ratings = ratings.join(movies, ['movieId'], 'left')

In [None]:
get_mat_sparsity(ratings)

The ratings dataframe is  99.46% sparse.


In [None]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

In [None]:
# def get_binary_data(ratings):
#   ratings = ratings.withColumn('binary', lit(1))
#   userIds = ratings.select('userId').distinct()
#   movieIds = ratings.select('movieId').distinct()

#   user_movie = userIds.crossJoin(movieIds).join(ratings, ['userId', 'movieId'], "left")
#   user_movie = user_movie.select(['userId', 'movieid', 'binary']).fillna(0)
#   return user_movie

# user_movie = get_binary_data(ratings)
# user_movie

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
als = ALS(
            userCol = 'userId',
          itemCol = 'movieId',
          ratingCol = 'rating',
          nonnegative = True,
          implicitPrefs = False,
          coldStartStrategy = 'drop'

)

In [None]:
type(als)

pyspark.ml.recommendation.ALS

In [None]:
param_grid = ParamGridBuilder()\
              .addGrid(als.rank,[50])\
              .addGrid(als.regParam,[.15])\
              .build()



evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol= 'prediction')

print(f'Total models = {len(param_grid)}')

Total models = 1


In [None]:
cv = CrossValidator(estimator=als,
                    estimatorParamMaps=param_grid,
                    evaluator = evaluator,
                    numFolds=5)

In [None]:
train.dtypes

[('userId', 'int'), ('movieId', 'int'), ('rating', 'float')]

In [None]:
model = cv.fit(train)
best_model = model.bestModel

In [None]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

print("  Rank:", best_model._java_obj.parent().getRank())

print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 50
  MaxIter: 10
  RegParam: 0.15


In [None]:
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8415741092144088


In [None]:
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|136222|    148|   2.0| 2.4936218|
|137949|    148|   4.0| 3.0597515|
| 88527|    148|   2.0| 2.3947408|
| 96427|    148|   3.0| 2.9511008|
| 36821|    148|   4.0|  2.921009|
| 32882|    148|   3.0| 2.8030317|
| 46146|    148|   2.0| 1.9115702|
| 54726|    148|   5.0| 3.2103803|
|  3990|    148|   4.0| 2.3966415|
| 80693|    148|   1.0| 2.8209581|
| 64843|    148|   3.5| 2.6968012|
| 81300|    148|   1.0| 2.8303776|
| 48644|    148|   3.0| 2.9738898|
| 62028|    148|   3.0| 2.7537916|
|128653|    148|   2.0|  2.457549|
|  9084|    148|   2.0|  2.934641|
| 70446|    148|   2.0| 2.9676156|
| 75359|    148|   3.0| 3.0531363|
|110991|    148|   2.5| 2.7478993|
| 10434|    148|   3.0|   2.59038|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[{126219, 8.54026...|
|   463|[{126219, 8.24055...|
|   471|[{126219, 7.53186...|
|   496|[{124273, 8.23773...|
|   833|[{126219, 6.81538...|
|  1088|[{126219, 6.46111...|
|  1238|[{126219, 7.31340...|
|  1342|[{124273, 8.37669...|
|  1580|[{126219, 5.45706...|
|  1591|[{126219, 8.18329...|
|  1645|[{126219, 7.63135...|
|  1829|[{126219, 8.19517...|
|  1959|[{124273, 5.75314...|
|  2122|[{124273, 5.68377...|
|  2142|[{124273, 7.63380...|
|  2366|[{126219, 8.07422...|
|  2659|[{124273, 7.80590...|
|  2866|[{124273, 7.42256...|
|  3175|[{126219, 8.46423...|
|  3749|[{124273, 7.67161...|
+------+--------------------+
only showing top 20 rows



In [None]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
nrecommendations.limit(10).show()

NameError: ignored

In [None]:
nrecommendations.join(movies, on='movieId').filter('userId = 100').show()

In [None]:
ratings.join(movies, on='movieId').filter('userId = 100').sort('rating', ascending=False).limit(10).show()