In [2]:
from pyspark.sql import functions as f
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import StringType, DoubleType, IntegerType, StructType, StructField

import os
os.environ["JAVA_HOME"] = 'C:\Program Files\Java\jdk-20'

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Recommender System").config(
    "spark.sql.crossJoin.enabled", "true").getOrCreate()



In [3]:
schemaRating = StructType([StructField('userId', IntegerType(), True),
                     StructField('movieId', IntegerType(), True),
                     StructField('rating', IntegerType(), True),
                     StructField('timestamp', DoubleType(), True)])

In [4]:
data = spark.read.csv('data/ml-1m/ratings.dat',sep = '::', header = False, schema = schemaRating)
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: double (nullable = true)



In [5]:
data.describe().show()


+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|           1000209|           1000209|           1000209|             1000209|
|   mean| 3024.512347919285|1865.5398981612843| 3.581564453029317| 9.722436954046655E8|
| stddev|1728.4126948999715|1096.0406894572482|1.1171018453732606|1.2152558939916052E7|
|    min|                 1|                 1|                 1|        9.56703932E8|
|    max|              6040|              3952|                 5|        1.04645459E9|
+-------+------------------+------------------+------------------+--------------------+



In [6]:
train_data, test_data = data.randomSplit([0.7, 0.3])

In [7]:
train_data.describe().show()


+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            700196|            700196|            700196|              700196|
|   mean| 3025.427553142263|1866.0319781889643| 3.581095864586487| 9.722436450144145E8|
| stddev|1728.5358255837127|1096.7561203355333|1.1172803152791009|1.2167598671152102E7|
|    min|                 1|                 1|                 1|        9.56703954E8|
|    max|              6040|              3952|                 5|       1.046454548E9|
+-------+------------------+------------------+------------------+--------------------+



In [8]:
test_data.describe().show()


+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            300013|            300013|            300013|              300013|
|   mean|3022.3763636909066|1864.3914397042795|3.5826580848163245| 9.722438130097429E8|
| stddev| 1728.126282690675|1094.3701015507588|1.1166863027488934|1.2117405550766544E7|
|    min|                 1|                 1|                 1|        9.56703932E8|
|    max|              6040|              3952|                 5|        1.04645459E9|
+-------+------------------+------------------+------------------+--------------------+



## Build Model

In [9]:
recommender = ALS(maxIter=5, regParam=0.01, userCol='userId',
                  itemCol='movieId', ratingCol='rating')
model = recommender.fit(train_data)


## Evaluate Model

In [10]:
pred_data = model.transform(test_data)
pred_data.show()


+------+-------+------+------------+----------+
|userId|movieId|rating|   timestamp|prediction|
+------+-------+------+------------+----------+
|     1|   1721|     4|9.78300055E8| 4.1596465|
|     1|   1270|     5|9.78300055E8| 4.0622654|
|     1|    588|     4|9.78824268E8| 4.0168386|
|     1|   2018|     4|9.78301777E8| 4.8098726|
|     1|   1961|     5| 9.7830159E8|  4.613861|
|     1|   1207|     4|9.78300719E8| 5.1726365|
|     1|    919|     4|9.78301368E8|  5.098341|
|     1|     48|     5|9.78824351E8| 3.7695992|
|     2|    318|     5|9.78298413E8| 4.5498414|
|     1|   3408|     4|9.78300275E8|   4.18738|
|     1|    594|     4|9.78302268E8| 4.7469335|
|     1|   1545|     4|9.78824139E8| 5.0774083|
|     1|   1022|     5|9.78300055E8| 4.3722167|
|     1|   2918|     4|9.78302124E8|  4.067622|
|     1|    720|     3| 9.7830076E8|  4.551594|
|     1|    938|     4|9.78301752E8|  4.760317|
|     2|    265|     4|9.78299026E8|  4.437585|
|     1|   2028|     5|9.78301619E8| 4.4

In [11]:
evaluator = RegressionEvaluator(
    metricName='rmse', labelCol='rating', predictionCol='prediction')
print("Root-mean-square error = " + str(evaluator.evaluate(pred_data)))


Root-mean-square error = nan


In [12]:
avgRatings = data.select('rating').groupBy().avg().first()[0]
print('The average rating in the dataset is: {}'.format(avgRatings))


The average rating in the dataset is: 3.581564453029317


In [13]:
evaluator = RegressionEvaluator(
    metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(
    evaluator.evaluate(pred_data.na.fill(avgRatings))))


The root mean squared error for our model is: 0.9109961429823804


In [14]:
evaluator = RegressionEvaluator(
    metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(
    evaluator.evaluate(pred_data.na.drop())))


The root mean squared error for our model is: 0.9108085573495974


## Define Movie details

In [16]:
schemaMovie = StructType([StructField('movieId', IntegerType(), True),
                     StructField('title', StringType(), True),
                     StructField('genres', StringType(), True)])
movieDetails = spark.read.csv(
    'data/ml-1m/movies.dat', sep='::', header=False, schema=schemaMovie)
movieDetails.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [17]:
occupation = {
    0:  "other",
    1:  "academic/educator",
    2:  "artist",
    3:  "clerical/admin",
    4:  "college/grad student",
    5:  "customer service",
    6:  "doctor/health care",
    7:  "executive/managerial",
    8:  "farmer",
    9:  "homemaker",
    10:  "K-12 student",
    11:  "lawyer",
    12:  "programmer",
    13:  "retired",
    14:  "sales/marketing",
    15:  "scientist",
    16:  "self-employed",
    17:  "technician/engineer",
    18:  "tradesman/craftsman",
    19:  "unemployed",
    20:  "writer"

}


In [18]:
schemaUser = StructType([StructField('UserID', IntegerType(), True),
                     StructField('Gender', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Occupation', IntegerType(), True),
                     StructField('Zipcode', IntegerType(), True)])
userDetails = spark.read.csv(
    'data/ml-1m/users.dat', sep='::', header=False, schema=schemaUser)
userDetails.printSchema()


root
 |-- UserID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)



In [23]:
userRecs = model.recommendForAllUsers(10)
userRecs.printSchema()


root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [42]:
def get_rec(rec):
  rec = rec.select("userId", f.explode(rec.recommendations))
  rec = rec.select("col.*", "*")
  rating_mat = rec.select(["userId", "movieId", "rating"])
  return rating_mat


In [43]:
rating_mat = get_rec(model.recommendForAllUsers(8))


In [49]:
final_rec = rating_mat.join(movieDetails, on="movieId", how="left")
final_rec.filter("userId=11").orderBy(final_rec["rating"].desc()).show(10)
final_rec.printSchema()


+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|   1152|    11|10.211037|He Walked by Nigh...|Crime|Film-Noir|T...|
|   1613|    11|9.0795355|    Star Maps (1997)|               Drama|
|   1511|    11| 8.992776|A Chef in Love (1...|              Comedy|
|    567|    11| 8.614988|         Kika (1993)|               Drama|
|   1038|    11| 8.391063|Unhook the Stars ...|               Drama|
|   2257|    11| 8.388325|No Small Affair (...|      Comedy|Romance|
|   2298|    11| 8.366794|  Strangeland (1998)|            Thriller|
|   1180|    11| 8.219752| Hear My Song (1991)|              Comedy|
+-------+------+---------+--------------------+--------------------+

root
 |-- movieId: integer (nullable = true)
 |-- userId: integer (nullable = false)
 |-- rating: float (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: