In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=0b9adbd6ebc477da7aed570a55edf0911fce5146bcf7ee51b4a25d6fd4ff73ac
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("movie_recommendation").config("spark.some.config.option","some-value").getOrCreate()

In [None]:
rating = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/ratings_small.csv")
rating = rating.drop("timestamp")

movies = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/movies_metadata.csv")
movie_data = rating.join(movies,on= rating.movieId == movies.id)
columns = len(movie_data.columns)
Rows = movie_data.count()
print("Number of Columns: {}\nNumber of Rows: {}".format(columns,Rows))
movie_data.columns

Number of Columns: 27
Number of Rows: 44925


['userId',
 'movieId',
 'rating',
 'adult',
 'belongs_to_collection',
 'budget',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

In [None]:
import numpy as np
from pyspark.sql.functions import when

movie_data = movie_data.withColumn("userId",when(movie_data.userId==0,np.nan).otherwise(movie_data.userId))
movie_data = movie_data.withColumn("id",when(movie_data.id==0,np.nan).otherwise(movie_data.id))
movie_data = movie_data.withColumn("rating",when(movie_data.rating==0,np.nan).otherwise(movie_data.rating))
movie_data = movie_data.withColumn("title",when(movie_data.title==0,np.nan).otherwise(movie_data.title))
from pyspark.sql.types import IntegerType
movie_data = movie_data.withColumn("budget",movie_data["budget"].cast(IntegerType()))
movie_data.show()

+------+-------+------+-----+---------------------+--------+--------------------+--------------------+----+---------+-----------------+----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+----------------+-----+------------+----------+
|userId|movieId|rating|adult|belongs_to_collection|  budget|              genres|            homepage|  id|  imdb_id|original_language|  original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|           title|video|vote_average|vote_count|
+------+-------+------+-----+---------------------+--------+--------------------+--------------------+----+---------+-----------------+----------------+--------------------+----------+--------------------+--------------------+--------------------

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import IntegerType

# Cast the 'id' column to IntegerType
movie_data = movie_data.withColumn("id",movie_data["id"].cast(IntegerType()))

(training,test) = movie_data.randomSplit([0.8,0.2])

als = ALS(maxIter=5,regParam=0.09,rank=25,userCol="userId",itemCol="id",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)
model = als.fit(training)

In [None]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

RMSE=0.9268909817870974
+------+-------+------+-----+---------------------+---------+--------------------+--------------------+----+---------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------+--------------------+--------------------+------------+--------------------+----------+----------+
|userId|movieId|rating|adult|belongs_to_collection|   budget|              genres|            homepage|  id|  imdb_id|original_language|      original_title|            overview|          popularity|         poster_path|production_companies|production_countries|        release_date|             revenue|   runtime|    spoken_languages|  status|             tagline|               title|       video|        vote_average|vote_count|prediction|
+------+-------+------+-----+---------------------+---------+--------------------+------

In [None]:
single_user = test.filter(test['userId']==25).select(['id','userId','title','genres'])
single_user.show(truncate = False)

+---+------+----------------+-----------------------------------------------------------------------------------------------+
|id |userId|title           |genres                                                                                         |
+---+------+----------------+-----------------------------------------------------------------------------------------------+
|707|25.0  |A View to a Kill|[{'id': 12, 'name': 'Adventure'}, {'id': 28, 'name': 'Action'}, {'id': 53, 'name': 'Thriller'}]|
|802|25.0  |Lolita          |[{'id': 18, 'name': 'Drama'}, {'id': 10749, 'name': 'Romance'}]                                |
+---+------+----------------+-----------------------------------------------------------------------------------------------+



In [None]:
recomendations = model.transform(single_user)
recomendations.orderBy('prediction',ascending=False).show(truncate = False )

+---+------+----------------+-----------------------------------------------------------------------------------------------+----------+
|id |userId|title           |genres                                                                                         |prediction|
+---+------+----------------+-----------------------------------------------------------------------------------------------+----------+
|802|25.0  |Lolita          |[{'id': 18, 'name': 'Drama'}, {'id': 10749, 'name': 'Romance'}]                                |3.2104833 |
|707|25.0  |A View to a Kill|[{'id': 12, 'name': 'Adventure'}, {'id': 28, 'name': 'Action'}, {'id': 53, 'name': 'Thriller'}]|2.9992335 |
+---+------+----------------+-----------------------------------------------------------------------------------------------+----------+

