In [3]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [24]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col,explode
spark=SparkSession.builder.appName("Colab").getOrCreate()

In [5]:
movie=spark.read.csv("/content/drive/MyDrive/ColabNotebooks/MovieRecommendationPyspark/Datasets/movies.csv",inferSchema=True,header=True)
ratings=spark.read.csv("/content/drive/MyDrive/ColabNotebooks/MovieRecommendationPyspark/Datasets/ratings.csv",inferSchema=True,header=True)
movie.show(5)
ratings.show(5)
movie.printSchema()
ratings.printSchema()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId:

In [6]:
df=ratings.join(movie,'movieId','left')

In [7]:
(train,test)=df.randomSplit([0.8,0.2])
print(df.count())
print(train.count())
train.show()
print(test.count())
test.show(1)

100836
80496
+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|     1|   4.0| 964982703|Toy Story (1995)|Adventure|Animati...|
|      1|     5|   4.0| 847434962|Toy Story (1995)|Adventure|Animati...|
|      1|     7|   4.5|1106635946|Toy Story (1995)|Adventure|Animati...|
|      1|    15|   2.5|1510577970|Toy Story (1995)|Adventure|Animati...|
|      1|    17|   4.5|1305696483|Toy Story (1995)|Adventure|Animati...|
|      1|    18|   3.5|1455209816|Toy Story (1995)|Adventure|Animati...|
|      1|    27|   3.0| 962685262|Toy Story (1995)|Adventure|Animati...|
|      1|    31|   5.0| 850466616|Toy Story (1995)|Adventure|Animati...|
|      1|    43|   5.0| 848993983|Toy Story (1995)|Adventure|Animati...|
|      1|    44|   3.0| 869251860|Toy Story (1995)|Adventure|Animati...|
|      1|    45|   4.0| 951170182|Toy 

In [8]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    maxIter=5,
    rank=10,
    regParam=0.1,
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)

In [9]:
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50]) \
    .addGrid(als.regParam, [0.01, 0.1]) \
    .build()

In [12]:
evaluator = RegressionEvaluator(
           metricName="rmse",
           labelCol="rating",
           predictionCol="prediction")

In [13]:
from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    trainRatio=0.8
)


In [14]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)


In [15]:
train.cache()
test.cache()

spark.conf.set("spark.sql.shuffle.partitions", "8")

In [16]:
model = tvs.fit(train)
best_model = model.bestModel
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)
print(rmse)

0.8664295792728198


In [18]:
print(rmse)

0.8664295792728198


In [19]:
recommendations = best_model.recommendForAllUsers(5)

In [20]:
df = recommendations

In [22]:
df.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{184245, 5.64822...|
|     2|[{131724, 4.86051...|
|     3|[{6835, 4.99875},...|
|     4|[{3851, 4.987083}...|
|     5|[{3379, 4.803812}...|
|     6|[{7121, 4.72286},...|
|     7|[{26865, 4.921818...|
|     8|[{898, 4.69317}, ...|
|     9|[{3200, 4.948173}...|
|    10|[{8869, 4.827723}...|
|    11|[{6818, 5.097192}...|
|    12|[{156025, 6.14862...|
|    13|[{3379, 5.028505}...|
|    14|[{4041, 4.4898973...|
|    15|[{3200, 4.4705815...|
|    16|[{3379, 4.4802136...|
|    17|[{3379, 4.92457},...|
|    18|[{3379, 4.8502584...|
|    19|[{3358, 4.205809}...|
|    20|[{1172, 4.92505},...|
+------+--------------------+
only showing top 20 rows


In [25]:
df2 = df.withColumn("movieid_rating", explode("recommendations"))


In [26]:
df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1| 184245| 5.648228|
|     1| 179135| 5.648228|
|     1| 134796| 5.648228|
|     1| 117531| 5.648228|
|     1|  86237| 5.648228|
|     2| 131724|4.8605165|
|     2|  80906|4.6080947|
|     2|   7121| 4.558212|
|     2|  60756| 4.537516|
|     2|  89774|4.4679117|
|     3|   6835|  4.99875|
|     3|   5746|  4.99875|
|     3|  70946|4.9871144|
|     3|   5181|  4.91952|
|     3|   4518|4.8387694|
|     4|   3851| 4.987083|
|     4|   4765|4.9427915|
|     4|   3365|4.8907223|
|     4|   1046| 4.871153|
|     4|   1733|4.8499084|
+------+-------+---------+
only showing top 20 rows
