In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
ratings_df = pd.read_csv("/content/drive/MyDrive/Data/ml-25m/ratings.csv")

In [None]:
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,1147880044
1,1,306,3.5,1147868817
2,1,307,5.0,1147868828
3,1,665,5.0,1147878820
4,1,899,3.5,1147868510


In [None]:
ratings_df = ratings_df.drop(columns=["timestamp"])

In [None]:
ratings_df.shape

(25000095, 3)

In [None]:
ratings_df = ratings_df.dropna()

In [None]:
ratings_df = ratings_df.drop_duplicates()

In [None]:
ratings_df.shape

(25000095, 3)

In [None]:
missing_values = ratings_df.isnull().sum()
print("Missing values:")
print(missing_values)

Missing values:
userId     0
movieId    0
rating     0
dtype: int64


In [None]:
uniq_users = pd.unique(ratings_df["userId"])
len(uniq_users)

162541

In [None]:
movie_users = pd.unique(ratings_df["movieId"])
len(movie_users)

59047

In [None]:
uniq_ratings = pd.unique(ratings_df["rating"])
len(uniq_ratings), uniq_ratings

(10, array([5. , 3.5, 4. , 2.5, 4.5, 3. , 0.5, 2. , 1. , 1.5]))

In [None]:
from sklearn.model_selection import train_test_split

training_data = []
validation_data = []
testing_data = []

grouped_ratings = ratings_df.groupby('userId')

for _, group in grouped_ratings:
    train, test = train_test_split(group, test_size=0.1, random_state=42)
    training_data.append(train)
    testing_data.append(test)

training_df = pd.concat(training_data)
testing_df = pd.concat(testing_data)

training_df.to_csv("/content/drive/MyDrive/Data/ml-25m/training.csv", index=False)
testing_df.to_csv("/content/drive/MyDrive/Data/ml-25m/testing.csv", index=False)

In [None]:
!pip install pyspark



In [18]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("Collaborative Filtering Example").getOrCreate()
training_data = spark.read.csv("/content/drive/MyDrive/Data/ml-25m/training.csv", header=True, inferSchema=True)

als = ALS(rank=30, maxIter=20, regParam=0.5, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True, seed=16)

model = als.fit(training_data)

testing_data = spark.read.csv("/content/drive/MyDrive/Data/ml-25m/testing.csv", header=True, inferSchema=True)
predictions = model.transform(testing_data)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on validation data = " + str(rmse))

spark.stop()

Root Mean Squared Error (RMSE) on validation data = 0.9985535801063252
