In [0]:
A%spark.pyspark
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os
from pyspark.sql import SparkSession


In [1]:
%spark.pyspark
spark = SparkSession.builder.appName('Recommender_system').getOrCreate()


In [2]:
%spark.pyspark
data = spark.read.csv("/home/nuzul/ratings.csv", header=True, inferSchema=True)
data.show()


In [3]:
%spark.pyspark
num_rows = data.count()
num_columns = len(data.columns)
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)


In [4]:
%spark.pyspark
movie_names_data = spark.read.csv("/home/nuzul/movies.csv", header=True, inferSchema=True)
movie_names_data.show()


In [5]:
%spark.pyspark
data.printSchema()


In [6]:
%spark.pyspark
data.describe().show()


In [7]:
%spark.pyspark
import matplotlib.pyplot as plt
from pyspark.sql.functions import avg  

avg_ratings = data.groupBy("movieId").agg(avg("rating").alias("avg_rating"))
combined_data = avg_ratings.join(movie_names_data, on="movieId")
combined_data_pd = combined_data.toPandas()

filtered_data = combined_data_pd[(combined_data_pd['movieId'] >= 1) & (combined_data_pd['movieId'] <= 10)]

plt.figure(figsize=(10, 6))
plt.bar(filtered_data["movieId"], filtered_data["avg_rating"])
plt.xlabel("Movie ID")
plt.ylabel("Average Rating")
plt.title("Top 50 Movies by Average Rating (Movie IDs 1-50)")
plt.show()


In [8]:
%spark.pyspark
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)
print("Number of rows in train_data:", train_data.count())
print("Number of rows in test_data:", test_data.count())


In [9]:
%spark.pyspark
user_id_input = 1
unrated_movies = test_data.filter(test_data['userId'] != user_id_input).select('movieId').distinct()
unrated_movies.show(10)


In [10]:
%spark.pyspark
from pyspark.sql import Row
from datetime import datetime

ratings_path = "/home/nuzul/ratings.csv"
ratings_data = spark.read.csv(ratings_path, header=True, inferSchema=True)

movie_ids_to_rate = [1645, 3794, 33722, 2142, 66658, 1580, 54190, 3918, 1959]
user_id_input = 1
ratings_from_user = [4.5, 3.0, 5.0, 4.5, 3.0, 5.0, 4.5, 3.0, 5.0]
current_timestamp = int(datetime.now().timestamp())

new_ratings = []
for movie_id, rating in zip(movie_ids_to_rate, ratings_from_user):
    new_ratings.append(Row(userId=user_id_input, movieId=movie_id, rating=rating, timestamp=current_timestamp))

new_ratings_df = spark.createDataFrame(new_ratings)
new_ratings_df.show()

updated_ratings_data = ratings_data.union(new_ratings_df) 

train_data, test_data = updated_ratings_data.randomSplit([0.8, 0.2], seed=123)
print("Number of rows in train_data:", train_data.count())
print("Number of rows in test_data:", test_data.count())


In [11]:
%spark.pyspark
from pyspark.ml.recommendation import ALS    

als = ALS(maxIter=15, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(train_data)


In [12]:
%spark.pyspark
predictions = model.transform(test_data)
predictions.show()


In [13]:
%spark.pyspark
predictions_no_missing = predictions.na.drop(subset=["prediction"])


In [14]:
%spark.pyspark
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions_no_missing)
print(f'Root Mean Squared Error (RMSE): {rmse}')


In [15]:
%spark.pyspark
single_user = test_data.filter(test_data['userId'] == 1).select(['movieId', 'userId'])
single_user.show()

reccomendations = model.transform(single_user)
reccomendations.orderBy('prediction', ascending=False).show()


In [16]:
%spark.pyspark
