In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.mllib.recommendation import Rating
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator, RankingEvaluator
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Recommender system")\
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# movies = spark.read.csv("movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv("/content/drive/MyDrive/ratings.csv", header=True, inferSchema=True)
# movies.show(5)
ratings.show(5)
# movie_ratings = ratings.join(movies, ["movieId"], "inner")
# movie_ratings.show(5)
# movie_ratings = movie_ratings.filter(col("userId").isNotNull())


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=544ccb6c1cf1eb1893563e3ca71fac214d71d3634fa066f58fc134aded59c7e7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484

In [None]:
sampled_ratings = ratings.sample(fraction=0.2, seed=42)
sampled_ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    223|   4.0|1112485573|
|     1|    589|   3.5|1112485557|
|     1|    653|   3.0|1094785691|
|     1|   1090|   4.0|1112485453|
|     1|   1215|   4.0|1094786082|
+------+-------+------+----------+
only showing top 5 rows



In [None]:
print(sampled_ratings.count())

4004253


In [None]:
#df = spark.read.csv("data.csv", header=True, inferSchema=True)

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming you have the required imports

# ...

# Your data splitting
(train_data, test_data) = sampled_ratings.randomSplit([0.8, 0.2], seed=123)

# ALS model configuration
als = ALS(
    maxIter=10,
    regParam=0.01,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    implicitPrefs=True,  # Change to False if your data is explicit feedback
    nonnegative=True
)

# Parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5,3]) \
    .addGrid(als.regParam, [0.1, 0.01]) \
    .addGrid(als.maxIter, [5]) \
    .build()

# Evaluation
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

# Cross-validation
crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Model fitting
ALSModel = crossval.fit(train_data)
bestModel = ALSModel.bestModel

# Predictions
predictions = bestModel.transform(test_data)
predictions.show(5)

# Evaluation metrics
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})

# Print results
print(f"RMSE: {rmse}")
print(f"MSE: {mse}")


+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
|   148|   1777|   3.0|1018967190|0.075332925|
|   148|   4366|   3.0|1018966761|0.008367139|
|   463|     11|   4.0| 833466239| 0.29597232|
|   463|    161|   4.0| 833465881|  0.4959344|
|   463|    207|   4.0| 833466874| 0.12944262|
+------+-------+------+----------+-----------+
only showing top 5 rows

RMSE: 3.538607774732653
MSE: 12.521744983398378


In [None]:
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql.functions import col, collect_list
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

userRecs = bestModel.recommendForAllUsers(100)

user_ground_truth = test_data.groupby('userId').agg(collect_list('movieId').alias('ground_truth_items'))
user_train_items = train_data.groupby('userId').agg(collect_list('movieId').alias('train_items'))


user_eval = userRecs.join(user_ground_truth, on='userId').join(user_train_items, on='userId') \
    .select('userId', 'recommendations.movieId', 'ground_truth_items', 'train_items', 'recommendations.rating')

def calculate_precision(predicted, actual):
    k_values = range(1, len(actual) + 1)
    precision_values = [
        len(set(predicted[:k]) & set(actual)) / float(k) for k in k_values
    ]
    return float(sum(precision_values)) / len(precision_values)

calculate_precision_udf = udf(calculate_precision, FloatType())


user_eval = user_eval.withColumn(
    'precision',
    calculate_precision_udf(user_eval['movieId'], user_eval['ground_truth_items'])
)

# MAP = user_eval.agg({'precision': 'avg'}).collect()[0][0]
MAP = user_eval.agg(F.mean('precision')).collect()[0][0]
print(f"MAP: {MAP}")



MAP: 0.012295327184645705


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, ArrayType, MapType
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math

#(train_data, test_data) = df.randomSplit([0.8, 0.2],seed=123)
(train_data, test_data) = sampled_ratings.randomSplit([0.8, 0.2],seed=123)

movie_grouped = train_data.groupBy('movieId').agg(
    F.collect_list('userId').alias('users'),
    F.collect_list('rating').alias('ratings')
)
movie_ratings_combined = movie_grouped.withColumn(
    'user_rating',
    F.arrays_zip('users', 'ratings')
).select('movieId', 'user_rating')

def cosine_similarity(ratings1, ratings2):
    rating_dict1 = {r['users']: r['ratings'] for r in ratings1}
    rating_dict2 = {r['users']: r['ratings'] for r in ratings2}

    common_users = set(rating_dict1.keys()).intersection(rating_dict2.keys())

    dot_product = sum(rating_dict1[user] * rating_dict2[user] for user in common_users)
    magnitude1 = math.sqrt(sum(rating_dict1[user] ** 2 for user in rating_dict1))
    magnitude2 = math.sqrt(sum(rating_dict2[user] ** 2 for user in rating_dict2))

    if magnitude1 != 0 and magnitude2 != 0:
        return dot_product / (magnitude1 * magnitude2)
    else:
        return 0.0
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

movie_pairs_with_similarity = movie_ratings_combined.alias("df1").crossJoin(
    movie_ratings_combined.alias("df2").withColumnRenamed("movieId", "movieId2").withColumnRenamed("user_rating", "user_rating2")
).where("df1.movieId != movieId2").withColumn(
    "similarity",
    cosine_similarity_udf(col("df1.user_rating"), col("user_rating2"))
)

movie_pairs_with_similarity.show()


+-------+--------------------+--------+--------------------+--------------------+
|movieId|         user_rating|movieId2|        user_rating2|          similarity|
+-------+--------------------+--------+--------------------+--------------------+
|      1|[{309, 4.0}, {413...|       2|[{2487, 2.0}, {26...|0.007491789327389531|
|      1|[{309, 4.0}, {413...|       3|[{2350, 3.0}, {46...|0.009125266483879567|
|      1|[{309, 4.0}, {413...|       4|[{156, 3.0}, {168...|                 0.0|
|      1|[{309, 4.0}, {413...|       5|[{324, 3.0}, {429...|                 0.0|
|      1|[{309, 4.0}, {413...|       6|[{2082, 3.5}, {20...|0.007502872148281604|
|      1|[{309, 4.0}, {413...|       7|[{741, 4.5}, {138...|                 0.0|
|      1|[{309, 4.0}, {413...|       8|[{4494, 4.0}, {76...|                 0.0|
|      1|[{309, 4.0}, {413...|       9|[{2856, 0.5}, {17...|                 0.0|
|      1|[{309, 4.0}, {413...|      10|[{159, 3.0}, {557...|0.003378346915809851|
|      1|[{309, 

In [None]:
columns_to_drop = ["user_rating", "user_rating2"]

# Drop the specified columns
movie_pairs_with_similarity = movie_pairs_with_similarity.drop(*columns_to_drop)

# Show the modified DataFrame
movie_pairs_with_similarity.show(truncate=False)

movie_pairs_with_similarity.cache()

+-------+--------+--------------------+
|movieId|movieId2|similarity          |
+-------+--------+--------------------+
|1      |2       |0.007491789327389531|
|1      |3       |0.009125266483879567|
|1      |4       |0.0                 |
|1      |5       |0.0                 |
|1      |6       |0.007502872148281604|
|1      |7       |0.0                 |
|1      |8       |0.0                 |
|1      |9       |0.0                 |
|1      |10      |0.003378346915809851|
|1      |11      |0.0                 |
|1      |12      |0.0                 |
|1      |13      |0.0                 |
|1      |14      |0.0                 |
|1      |15      |0.0                 |
|1      |16      |0.008549380611425152|
|1      |17      |0.0                 |
|1      |18      |0.0                 |
|1      |19      |0.0                 |
|1      |20      |0.0                 |
|1      |21      |0.0                 |
+-------+--------+--------------------+
only showing top 20 rows



DataFrame[movieId: int, movieId2: int, similarity: double]

In [None]:
from pyspark.sql.functions import col, row_number
from pyspark.sql import Window
windowSpec = Window.partitionBy("movieId").orderBy(col("similarity").desc())

# Add a rank column based on the similarity within each partition
ranked_data = movie_pairs_with_similarity.withColumn("rank", row_number().over(windowSpec))

# Filter only the top 5 rows for each 'movieId'
top5_similarities = ranked_data.filter(col("rank") <= 5).drop("rank")

# Show the result
top5_similarities.show(truncate=False)


ModuleNotFoundError: ignored

In [None]:
top5_similarities.cache()

In [None]:
test_data_joined_similarity = (
    test_data.join(top5_similarities, on=test_data.movieId == top5_similarities.movieId, how='left')
    .select(
        test_data["movieId"].alias("movieId_1"),
        top5_similarities["movieId2"].alias("movieId_2"),
        test_data["userId"].alias("user_Id"),
        top5_similarities["similarity"].alias("similarity"),
        test_data["rating"].alias("rating")
    )
)



# Show the result
test_data_joined_similarity.show(truncate=False)

In [None]:
train_data_joined = (
    test_data_joined_similarity.alias("test_data")
    .join(train_data.alias("train_data"), on=((col("test_data.movieId_2") == col("train_data.movieId")) & (col("test_data.user_Id") == col("train_data.userId"))), how='left')
    .select(
        col("test_data.movieId_1").alias("f_movieId"),
        col("test_data.movieId_2").alias("f_movieId2"),
        col("test_data.user_Id").alias("f_userId"),
        col("test_data.similarity").alias("similarity"),
        col("test_data.rating").alias("true_rating"),
        col("train_data.rating").alias("rating")
    )
)

train_data_joined.show(truncate=False)

In [None]:
train_data_joined.cache()

In [None]:
result_not_null = train_data_joined.filter(col("rating").isNotNull())

# Show the result where 'rating' is not null
result_not_null.show(truncate=False)

In [None]:
result_grouped_weighted = (
    train_data_joined
    .groupBy("f_movieId")
    .agg(
        F.sum((F.col("similarity") * F.col("rating"))).alias("weighted_sum_ratings"),
        F.sum("similarity").alias("sum_similarity"),
        F.first("true_rating").alias("true_rating")
    ).withColumn("prediction", F.col("weighted_sum_ratings") / F.col("sum_similarity"))
)

result_grouped_weighted.show(truncate=False)


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="true_rating", metricName="rmse")
rmse = evaluator.evaluate(result_grouped_weighted)
print(f"Root Mean Squared Error for Item-Item CF (RMSE): {rmse}")

evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="true_rating", metricName="mse")
mse = evaluator_mse.evaluate(result_grouped_weighted)
print(f"Mean Squared Errorfor Item-Item CF (MSE): {mse}")

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression

movies = spark.read.csv("/content/drive/MyDrive/movies.csv", header=True, inferSchema=True)

movies.show(truncate=False)

+-------+-------------------------------------+-------------------------------------------+
|movieId|title                                |genres                                     |
+-------+-------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                       |Comedy|Romance                             |
|8      |Tom and Huck (1995)                  |Adventure|Children               

In [None]:
lr_data = (
    sampled_ratings.alias("rdata")
    .join(movies.alias("mdata"), on=(col("rdata.movieId") == col("mdata.movieId")), how='left')
    .select(
        col("rdata.movieId").alias("f_movieId"),
        col("mdata.title").alias("title"),
        col("rdata.userId").alias("f_userId"),
        col("mdata.genres").alias("genres"),
        col("rdata.rating").alias("rating")
    )
)

In [None]:
lr_data.show()

+---------+--------------------+--------+--------------------+------+
|f_movieId|               title|f_userId|              genres|rating|
+---------+--------------------+--------+--------------------+------+
|    53953|         1408 (2007)|      61|Drama|Horror|Thri...|   2.0|
|      590|Dances with Wolve...|      64|Adventure|Drama|W...|   5.0|
|      432|City Slickers II:...|      68|Adventure|Comedy|...|   3.0|
|     2739|Color Purple, The...|      73|               Drama|   2.0|
|    86880|Pirates of the Ca...|      82|Action|Adventure|...|   2.0|
|     5502|        Signs (2002)|      90|Horror|Sci-Fi|Thr...|   3.5|
|     1380|       Grease (1978)|      91|Comedy|Musical|Ro...|   4.0|
|   103042| Man of Steel (2013)|      96|Action|Adventure|...|   2.0|
|     1210|Star Wars: Episod...|     100|Action|Adventure|...|   4.0|
|      885|        Bogus (1996)|     104|Children|Drama|Fa...|   1.0|
|       78|Crossing Guard, T...|     114|Action|Crime|Dram...|   4.0|
|      484|       La

In [None]:
from pyspark.ml.feature import Tokenizer,IDF, HashingTF


tokenizer_genres = Tokenizer(inputCol="genres", outputCol="genres_tokens")
lr_data = tokenizer_genres.transform(lr_data)

# Apply HashingTF on 'genres_tokens'
hashingTF_genres = HashingTF(inputCol="genres_tokens", outputCol="rawFeatures_genres", numFeatures=20)
hashing_df = hashingTF_genres.transform(lr_data)

# Apply IDF on 'rawFeatures_genres'
idf_genres = IDF(inputCol="rawFeatures_genres", outputCol="features_genres")
idfModel_genres = idf_genres.fit(hashing_df)
vectorized_df = idfModel_genres.transform(hashing_df)

# Similarly, you can apply the same process for the 'title' column
tokenizer_title = Tokenizer(inputCol="title", outputCol="title_tokens")
vectorized_df = tokenizer_title.transform(vectorized_df)

hashingTF_title = HashingTF(inputCol="title_tokens", outputCol="rawFeatures_title", numFeatures=20)
hashing_df = hashingTF_title.transform(vectorized_df)

idf_title = IDF(inputCol="rawFeatures_title", outputCol="features_title")
idfModel_title = idf_title.fit(hashing_df)
vectorized_df = idfModel_title.transform(hashing_df)



In [None]:
from pyspark.ml.feature import VectorAssembler

# Assuming you have two feature columns: "features_genres" and "features_title"
assembler = VectorAssembler(
    inputCols=["features_genres", "features_title"],
    outputCol="combined_features"
)

# Assuming 'lr_data' is your DataFrame with features
vectorized_df = assembler.transform(vectorized_df)

vectorized_df.show()
vectorized_df.cache()

+---------+--------------------+--------+--------------------+------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|f_movieId|               title|f_userId|              genres|rating|       genres_tokens|rawFeatures_genres|     features_genres|        title_tokens|   rawFeatures_title|      features_title|   combined_features|
+---------+--------------------+--------+--------------------+------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    53953|         1408 (2007)|      61|Drama|Horror|Thri...|   2.0|[drama|horror|thr...|   (20,[19],[1.0])|(20,[19],[3.76304...|      [1408, (2007)]|(20,[7,19],[1.0,1...|(20,[7,19],[1.809...|(40,[19,27,39],[3...|
|      590|Dances with Wolve...|      64|Adventure|Drama|W...|   5.0|[adventure|drama|...|    (20,[2],[1.0])|(20,[2],[3.264279...|[dances, w

DataFrame[f_movieId: int, title: string, f_userId: int, genres: string, rating: double, genres_tokens: array<string>, rawFeatures_genres: vector, features_genres: vector, title_tokens: array<string>, rawFeatures_title: vector, features_title: vector, combined_features: vector]

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


(train_data, test_data) = vectorized_df.randomSplit([0.8, 0.2], seed=1234)
train_data.cache()
test_data.cache()

# Defining the Linear Regression Model
lr = LinearRegression(
    featuresCol="combined_features",
    labelCol="rating"
)

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1.0]).build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse"),
                          numFolds=10)

lrModel = crossval.fit(train_data)

bestModel = lrModel.bestModel

predictions = bestModel.transform(test_data)
predictions.show(5)

+---------+--------------------+--------+--------------------+------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|f_movieId|               title|f_userId|              genres|rating|       genres_tokens|rawFeatures_genres|     features_genres|        title_tokens|   rawFeatures_title|      features_title|   combined_features|        prediction|
+---------+--------------------+--------+--------------------+------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|        1|    Toy Story (1995)|    1836|Adventure|Animati...|   5.0|[adventure|animat...|   (20,[16],[1.0])|(20,[16],[2.30846...|[toy, story, (1995)]|(20,[15,17],[1.0,...|(20,[15,17],[1.58...|(40,[16,35,37],[2...| 3.524008858736442|
|        1|    Toy Story (1995)|    8636|Adventure|Animati...|  

In [None]:
predictions.show(20)

+---------+--------------------+--------+--------------------+------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|f_movieId|               title|f_userId|              genres|rating|       genres_tokens|rawFeatures_genres|     features_genres|        title_tokens|   rawFeatures_title|      features_title|   combined_features|        prediction|
+---------+--------------------+--------+--------------------+------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|        1|    Toy Story (1995)|    1836|Adventure|Animati...|   5.0|[adventure|animat...|   (20,[16],[1.0])|(20,[16],[2.30846...|[toy, story, (1995)]|(20,[15,17],[1.0,...|(20,[15,17],[1.58...|(40,[16,35,37],[2...| 3.524008858736442|
|        1|    Toy Story (1995)|    8636|Adventure|Animati...|  

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error for Linear Regression (RMSE): {rmse}")

evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mse")
mse = evaluator_mse.evaluate(predictions)
print(f"Mean Squared Error for Linear Regression (MSE): {mse}")

Root Mean Squared Error for Linear Regression (RMSE): 1.058988643036256
Mean Squared Error for Linear Regression (MSE): 1.121456946079771
