# User Based Collaborative Filtering Recommendation Systems
Notebook to present algorithm of prediction movies for specific user

## Load libraries

In [1]:
import os
import pyspark.sql.functions as F

In [2]:
## change working directory
os.chdir('/app')
## show current working directory
os.getcwd()

'/app'

In [3]:
from ml.utils.spark_utils import get_spark_session
from ml.features.utils import normalize_ratings_df

In [None]:
BUCKET = os.getenv("MINIO_BUCKET_NAME", "recommendation-system")
USER_DATA_PATH = f"s3a://{BUCKET}/data/silver/netflix_user_data.parquet"
USER_STATS_PATH = f"s3a://{BUCKET}/data/gold/user_stats/"
MOVIES_INFO_PATH = f"s3a://{BUCKET}/data/silver/netflix_movie_data.parquet"
k = 50  # number of nearest neighbours
min_corated_movies = 10  # minimum number of co-rated movies for nearest neighbours
C = 5   # strength of ratings of the same movie from multiple users
selected_user_id = 923084

## Load data using spark

In [5]:
spark = get_spark_session()

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-251df4eb-3806-4b7f-8a1f-1fb533fdeaa9;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 369ms :: artifacts dl 12ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|

In [6]:
user_data_df = spark.read.parquet(USER_DATA_PATH)
user_data_df.show()

25/12/12 21:26:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+-------+------+----------+-------+
| UserID|Rating|      Date|MovieID|
+-------+------+----------+-------+
|2532865|     4|2005-07-26|   4500|
| 573364|     3|2005-06-20|   4500|
|1696725|     3|2004-02-27|   4500|
|1253431|     3|2004-03-31|   4500|
|1265574|     2|2003-09-01|   4500|
|1049643|     1|2003-11-15|   4500|
|1601348|     4|2005-04-05|   4500|
|1495289|     5|2005-07-09|   4500|
|1254903|     3|2003-09-02|   4500|
|2604070|     3|2005-05-15|   4500|
|1006473|     5|2005-05-23|   4500|
|1989892|     3|2004-04-06|   4500|
|1517471|     4|2003-12-24|   4500|
|1478381|     4|2005-05-21|   4500|
| 923084|     2|2004-11-15|   4500|
|2446292|     4|2005-10-06|   4500|
|2554745|     3|2003-05-07|   4500|
|1133125|     5|2004-08-10|   4500|
| 349528|     4|2003-08-11|   4500|
|1614895|     5|2004-08-29|   4500|
+-------+------+----------+-------+
only showing top 20 rows



In [7]:
user_stats_df = spark.read.parquet(USER_STATS_PATH)
user_stats_df.show()

+-------+------------------+------------------+----------+----------+------------+------------+
| UserID|       mean_rating|        std_rating|max_rating|min_rating|rated_movies|version_date|
+-------+------------------+------------------+----------+----------+------------+------------+
|2118461|  4.08232755714382|0.7718881684966115|         5|         1|       14831|  2025-12-11|
|2550711|3.3234501347708894| 1.023558066471066|         5|         1|        1484|  2025-12-11|
| 209104| 4.631921824104235|0.8530595751995691|         5|         1|         307|  2025-12-11|
|1296163| 3.148686030428769|0.6489871369393502|         5|         1|        1446|  2025-12-11|
| 104899|          2.559375|1.5043740479155001|         5|         1|         960|  2025-12-11|
| 550883| 2.685616827743036|1.2537938120381342|         5|         1|        1759|  2025-12-11|
|1196169|2.8864353312302837|1.1886843878396234|         5|         1|         317|  2025-12-11|
| 893789| 3.858508604206501|0.9135998562

In [8]:
movie_info_df = spark.read.parquet(MOVIES_INFO_PATH)
movie_info_df.show(truncate=False)

+-------+-------------+---------------------------------------------------------------------------+
|MovieID|YearOfRelease|Title                                                                      |
+-------+-------------+---------------------------------------------------------------------------+
|1      |2003         |Dinosaur Planet                                                            |
|2      |2004         |Isle of Man TT 2004 Review                                                 |
|3      |1997         |Character                                                                  |
|4      |1994         |Paula Abdul's Get Up & Dance                                               |
|5      |2004         |The Rise and Fall of ECW                                                   |
|6      |1997         |Sick                                                                       |
|7      |1992         |8 Man                                                                      |


## Calculate correlations between selected users and all other users from database

In [9]:
# Normalize user data
normalized_user_data = normalize_ratings_df(df=user_data_df, user_stats_df=user_stats_df)
normalized_user_data = normalized_user_data.select(
    F.col("UserID"),
    F.col("Rating"),
    F.col("MovieID")
)
normalized_user_data.show()

Normalizing ratings


                                                                                

+-------+--------------------+-------+
| UserID|              Rating|MovieID|
+-------+--------------------+-------+
|2532865|  0.3519230730725127|   4500|
| 573364| -0.6807503907112852|   4500|
|1696725|-0.31706727095306114|   4500|
|1253431| -0.6704267339350091|   4500|
|1265574| -1.5418798791333286|   4500|
|1049643|  -1.635555936971428|   4500|
|1601348| -0.3674047167570344|   4500|
|1495289|  1.1632653061224494|   4500|
|1254903|-0.21275243639192862|   4500|
|2604070| -0.3712333548306655|   4500|
|1006473|   1.003350093135977|   4500|
|1989892| -0.3750250354980193|   4500|
|1517471|  0.5406356111812762|   4500|
|1478381|  0.3895732587517264|   4500|
| 923084| -1.1189229270686383|   4500|
|2446292|  0.6321924864369479|   4500|
|2554745|-0.06301924300823204|   4500|
|1133125| 0.17541160386140378|   4500|
| 349528| 0.18160984190580085|   4500|
|1614895|  0.9019045345840109|   4500|
+-------+--------------------+-------+
only showing top 20 rows



                                                                                

In [10]:
# Select one user
selected_user_data = normalized_user_data.filter(F.col("UserID") == selected_user_id).cache()
print(selected_user_data.count())
selected_user_data.show()

                                                                                

1478
+------+--------------------+-------+
|UserID|              Rating|MovieID|
+------+--------------------+-------+
|923084| -1.1189229270686383|   4500|
|923084| -1.1189229270686383|   4517|
|923084| -1.1189229270686383|   4522|
|923084| -1.1189229270686383|   4533|
|923084| -1.1189229270686383|   4545|
|923084| -1.1189229270686383|   4562|
|923084|-0.14554559557514368|   4577|
|923084| -1.1189229270686383|   4586|
|923084|  -2.092300258562133|   4587|
|923084| -1.1189229270686383|   4634|
|923084|  1.8012090674118455|   4636|
|923084|  0.8278317359183509|   4640|
|923084| -1.1189229270686383|   4644|
|923084|  0.8278317359183509|   4652|
|923084|-0.14554559557514368|   4656|
|923084|  -2.092300258562133|   4660|
|923084|  0.8278317359183509|   4671|
|923084|  0.8278317359183509|   4697|
|923084|  0.8278317359183509|   4698|
|923084|  1.8012090674118455|   4727|
+------+--------------------+-------+
only showing top 20 rows



In [11]:
# Join user movies with other users ratings
pairs = (
    selected_user_data.alias("selected_user")
    .join(
        normalized_user_data.alias("all_users"),
        ( F.col("selected_user.MovieID") == F.col("all_users.MovieID") )
    )
    .filter(F.col("all_users.UserID") != selected_user_id)
    .select(
        F.col("selected_user.MovieID").alias("MovieID"),
        F.col("selected_user.Rating").alias("user_rating"),
        F.col("all_users.UserID").alias("others_user"),
        F.col("all_users.Rating").alias("others_rating")
    )
)
pairs.show()

+-------+-------------------+-----------+--------------------+
|MovieID|        user_rating|others_user|       others_rating|
+-------+-------------------+-----------+--------------------+
|   4500|-1.1189229270686383|    2532865|  0.3519230730725127|
|   4500|-1.1189229270686383|     573364| -0.6807503907112852|
|   4500|-1.1189229270686383|    1696725|-0.31706727095306114|
|   4500|-1.1189229270686383|    1253431| -0.6704267339350091|
|   4500|-1.1189229270686383|    1265574| -1.5418798791333286|
|   4500|-1.1189229270686383|    1049643|  -1.635555936971428|
|   4500|-1.1189229270686383|    1601348| -0.3674047167570344|
|   4500|-1.1189229270686383|    1495289|  1.1632653061224494|
|   4500|-1.1189229270686383|    1254903|-0.21275243639192862|
|   4500|-1.1189229270686383|    2604070| -0.3712333548306655|
|   4500|-1.1189229270686383|    1006473|   1.003350093135977|
|   4500|-1.1189229270686383|    1989892| -0.3750250354980193|
|   4500|-1.1189229270686383|    1517471|  0.5406356111

                                                                                

In [12]:
# Calculate correlations between selected user and all other users
corrs = (
    pairs
    .groupBy(F.col("others_user"))
    .agg(
        F.corr(
            F.col("user_rating"),
            F.col("others_rating")
        ).alias("correlation"),
        F.count("*").alias("corated_movies")
    )
).cache()
corrs.show()

25/12/12 21:27:11 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

+-----------+--------------------+--------------+
|others_user|         correlation|corated_movies|
+-----------+--------------------+--------------+
|    1704795| 0.39250268207703476|            17|
|    1146497| 0.08662034981421267|            68|
|     279120| 0.12081503394565528|           489|
|     569333| 0.19575875786532623|           454|
|    1049332|  0.2341269654442925|           492|
|    1001129|-0.01194373114521...|           780|
|     796682|-0.00667030561729...|           425|
|    1787480| 0.09891620533544349|           338|
|    2507129| 0.26582107477447986|           631|
|     284489| 0.18553670615047185|           325|
|     771361| 0.12128433615277556|           206|
|    2131455| -0.0896057965105191|           216|
|    1782775| 0.01779247089868537|           432|
|     822400| 0.07534136307678976|           410|
|    2199068| 0.16751580718145287|            19|
|    2389952| 0.03170164655920546|           125|
|     481749| 0.11555788123335475|           243|


                                                                                

In [None]:
# Find k nearest neighbours
k_nn = (
    corrs
    .filter(F.col("corated_movies") > min_corated_movies)
    .orderBy(F.col("correlation").desc())
    .limit(k)
).cache()
k_nn.show()



+-----------+------------------+--------------+
|others_user|       correlation|corated_movies|
+-----------+------------------+--------------+
|    1772084|0.9434048135477321|            11|
|    1371294|0.9220529258469679|            11|
|    2083930|0.9107604103916127|            11|
|      57546|0.9002759548448437|            11|
|    2153434|0.8983845666444721|            12|
|    1324828|0.8975103686746205|            14|
|     875246|0.8931746327127119|            14|
|    2522938|0.8907753580705424|            12|
|     167938|0.8873597692490789|            11|
|    2320584|0.8819171036881971|            12|
|    1892606|0.8797703301307919|            15|
|     319464|0.8764598212022149|            18|
|    1828005|0.8755967524606707|            19|
|     125176|0.8744063386711524|            11|
|    1863975|0.8740200053736314|            11|
|    1742823|0.8709062139390401|            13|
|     602712|0.8674427949190672|            13|
|    1568860|0.8672273827165131|        

                                                                                

In [14]:
# filter movies rated by nearest neighbours
k_nn_movies = (
    user_data_df.alias("user_data")
    .join(
        k_nn.alias("k_nn"),
        ( F.col("user_data.UserID") == F.col("k_nn.others_user") )
    )
)
k_nn_movies.show()

+-------+------+----------+-------+-----------+------------------+--------------+
| UserID|Rating|      Date|MovieID|others_user|       correlation|corated_movies|
+-------+------+----------+-------+-----------+------------------+--------------+
|1051609|     1|2005-07-06|   4533|    1051609|0.8589118694710288|            13|
| 125176|     5|2005-07-02|   4559|     125176|0.8744063386711524|            11|
|1278490|     3|2005-12-31|   4577|    1278490| 0.846907414636339|            14|
| 634961|     2|2005-10-25|   4577|     634961|0.8635351882621736|            17|
|2216047|     5|2004-10-06|   4577|    2216047|0.8383408109565674|            13|
| 326088|     3|2005-07-01|   4590|     326088|0.8425232519684704|            11|
| 998646|     5|2005-03-24|   4595|     998646| 0.836743024404081|            17|
|1051609|     5|2004-10-13|   4595|    1051609|0.8589118694710288|            13|
|2405598|     5|2005-09-10|   4633|    2405598|0.8639866292052777|            17|
|1564884|     5|

                                                                                

In [None]:
# base predictions
base_pred  = (
    k_nn_movies
    .groupBy(F.col("MovieID"))
    .agg(
        (
            F.sum(F.col("Rating") * F.col("correlation")) /
            F.sum(F.abs(F.col("correlation")))
        ).alias("raw_prediction"),
        F.count("*").alias("support")
    )
)
base_pred.show(truncate=False)



+-------+------------------+-------+
|MovieID|raw_prediction    |support|
+-------+------------------+-------+
|5518   |5.0               |1      |
|7240   |3.017620599573269 |4      |
|6266   |5.0               |1      |
|7879   |3.0               |1      |
|5695   |4.0               |1      |
|5670   |3.0               |1      |
|4773   |4.0               |1      |
|8423   |3.4895104895104896|2      |
|6204   |4.0               |1      |
|5926   |5.0               |2      |
|7173   |5.0               |1      |
|7744   |5.0               |1      |
|8116   |3.0947300853852493|2      |
|5066   |4.0               |1      |
|6437   |3.0               |1      |
|4779   |5.0               |1      |
|5768   |4.0               |1      |
|6528   |5.0               |1      |
|5401   |2.493057617854232 |2      |
|6904   |3.0               |1      |
+-------+------------------+-------+
only showing top 20 rows



                                                                                

In [None]:
# adjust raw predictions by number or rated movies from all nearest neighbours
suggestions = (
    base_pred
    .withColumn(
        "prediction",
        F.col("raw_prediction") * (F.col("support") / (F.col("support") + F.lit(C)))
    )
    .join(movie_info_df, "MovieID")
    .orderBy(F.col("prediction").desc())
)
suggestions.show(truncate=False)



+-------+------------------+-------+------------------+-------------+-----------------------------------------+
|MovieID|raw_prediction    |support|prediction        |YearOfRelease|Title                                    |
+-------+------------------+-------+------------------+-------------+-----------------------------------------+
|12232  |4.5199372893278875|17     |3.4926788144806404|2003         |Lost in Translation                      |
|10774  |4.765055532638923 |13     |3.4414289957947775|1976         |Taxi Driver                              |
|8782   |4.247822434440607 |16     |3.236436140526177 |2001         |The Royal Tenenbaums                     |
|11064  |4.999999999999999 |8      |3.0769230769230766|1994         |Pulp Fiction                             |
|11283  |5.0               |7      |2.916666666666667 |1994         |Forrest Gump                             |
|5732   |4.999999999999999 |6      |2.7272727272727266|1990         |GoodFellas: Special Edition        

                                                                                

In [17]:
# best rated movies by the selected user
(
    selected_user_data
    .join(
        movie_info_df,
        "MovieID"
    )
    .orderBy(F.col("Rating").desc())
    .show(truncate=False)
)

+-------+------+------------------+-------------+-------------------------------------+
|MovieID|UserID|Rating            |YearOfRelease|Title                                |
+-------+------+------------------+-------------+-------------------------------------+
|1218   |923084|1.8012090674118455|1972         |Frenzy                               |
|5224   |923084|1.8012090674118455|2003         |Bus 174                              |
|1395   |923084|1.8012090674118455|1963         |Charade                              |
|1098   |923084|1.8012090674118455|1965         |The Battle of Algiers                |
|1618   |923084|1.8012090674118455|1984         |Nausicaa of the Valley of the Wind   |
|473    |923084|1.8012090674118455|1997         |Princess Mononoke                    |
|1646   |923084|1.8012090674118455|1983         |The Dead Zone                        |
|256    |923084|1.8012090674118455|2000         |Ghost Dog: The Way of the Samurai    |
|1865   |923084|1.80120906741184

## Clean

In [18]:
spark.stop()