In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import findspark
import os
import pandas as pd

from ast import literal_eval

from stack_data.utils import truncate

findspark.init()

os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[2] pyspark-shell --jars C:\\Users\\09398\\PycharmProjects\\elasticsearch-spark-recommender\\elasticsearch-hadoop-8.17.0\\dist\\elasticsearch-spark-30_2.12-8.17.0.jar"

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
es_hadoop_jar = "C:\\Users\\09398\\PycharmProjects\\elasticsearch-spark-recommender\\elasticsearch-hadoop-8.17.0\\dist\\elasticsearch-spark-30_2.12-8.17.0.jar"

spark = SparkSession.builder \
    .appName("ElasticsearchIntegration") \
    .config("spark.speculation", "false") \
    .config("spark.jars", es_hadoop_jar) \
    .config("es.nodes", "http://localhost:9200") \
    .config("es.nodes.wan.only", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

In [131]:
spark.catalog.clearCache()

In [132]:
PATH_TO_DATA = "../data/archive"

In [133]:
md = spark.read.csv(
    f"{PATH_TO_DATA}/movies_metadata.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    escape="\"",
    quote='"',
    sep=',',
    ignoreTrailingWhiteSpace=True,
    ignoreLeadingWhiteSpace=True
)
md.show(5)

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+-----------

In [134]:
md = md.drop("adult", "belongs_to_collection", "homepage", "imdb_id", "original_language", "original_title", "poster_path",
             "production_countries", "status", "video", "spoken_languages", "production_companies", "revenue", "runtime", "budget")

In [135]:
md.show(5)

+--------------------+-----+--------------------+----------+------------+--------------------+--------------------+------------+----------+
|              genres|   id|            overview|popularity|release_date|             tagline|               title|vote_average|vote_count|
+--------------------+-----+--------------------+----------+------------+--------------------+--------------------+------------+----------+
|[{'id': 16, 'name...|  862|Led by Woody, And...| 21.946943|  1995-10-30|                NULL|           Toy Story|         7.7|      5415|
|[{'id': 12, 'name...| 8844|When siblings Jud...| 17.015539|  1995-12-15|Roll the dice and...|             Jumanji|         6.9|      2413|
|[{'id': 10749, 'n...|15602|A family wedding ...|   11.7129|  1995-12-22|Still Yelling. St...|    Grumpier Old Men|         6.5|        92|
|[{'id': 35, 'name...|31357|Cheated on, mistr...|  3.859495|  1995-12-22|Friends are the p...|   Waiting to Exhale|         6.1|        34|
|[{'id': 35, 'name..

Xu ly cot genres: chuyen thanh list cac genres


In [136]:
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
genres_schema = ArrayType(StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True)
]))
md = md.withColumn("genres", from_json(col("genres"), genres_schema))
md = md.withColumn("genres", expr("transform(genres, x -> x.name)"))

md.show(5, truncate=False)

+----------------------------+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+------------+------------------------------------------------------------------------------+---------------------------+------------+----------+
|genres                      |id   |overview                                                                                                                                                                                                                                                                                                                                                                                

In [137]:
vote_counts = md.filter(col('vote_count').isNotNull()).select(
    col('vote_count').cast('int'))


vote_averages = md.filter(col('vote_average').isNotNull()).select(
    col('vote_average').cast('int'))

In [138]:

C = vote_averages.agg(avg('vote_average')).first()[0]
C

5.244896612406511

In [139]:

m = vote_counts.approxQuantile('vote_count', [0.95], 0)[0]
m

434.0

In [140]:
md = md.withColumn('release_date', to_date(col('release_date'), 'yyyy-MM-dd'))
md = md.withColumn('year', year(col('release_date')))
md.show(5)

+--------------------+-----+--------------------+----------+------------+--------------------+--------------------+------------+----------+----+
|              genres|   id|            overview|popularity|release_date|             tagline|               title|vote_average|vote_count|year|
+--------------------+-----+--------------------+----------+------------+--------------------+--------------------+------------+----------+----+
|[Animation, Comed...|  862|Led by Woody, And...| 21.946943|  1995-10-30|                NULL|           Toy Story|         7.7|      5415|1995|
|[Adventure, Fanta...| 8844|When siblings Jud...| 17.015539|  1995-12-15|Roll the dice and...|             Jumanji|         6.9|      2413|1995|
|   [Romance, Comedy]|15602|A family wedding ...|   11.7129|  1995-12-22|Still Yelling. St...|    Grumpier Old Men|         6.5|        92|1995|
|[Comedy, Drama, R...|31357|Cheated on, mistr...|  3.859495|  1995-12-22|Friends are the p...|   Waiting to Exhale|         6.1|  

In [141]:
qualified = md.filter((col('vote_count') >= m) & col('vote_count').isNotNull() & col('vote_average').isNotNull()) \
              .select('title', 'year', 'vote_count', 'vote_average', 'popularity', 'genres')
qualified = qualified.withColumn('vote_count', col('vote_count').cast('int')) \
                     .withColumn('vote_average', col('vote_average').cast('int'))
shape = (qualified.count(), len(qualified.columns))
shape

(2274, 6)

In [142]:
from pyspark.sql.types import DoubleType, StructField


@udf(DoubleType())
def weighted_rating_udf(vote_count, vote_average):
    return (vote_count / (vote_count + m) * vote_average) + (m / (vote_count + m) * C)

In [143]:
qualified = qualified.withColumn('wr', weighted_rating_udf(
    col('vote_count'), col('vote_average')))

In [144]:
qualified = qualified.orderBy(col('wr').desc())
qualified.show(10)

+--------------------+----+----------+------------+----------+--------------------+------------------+
|               title|year|vote_count|vote_average|popularity|              genres|                wr|
+--------------------+----+----------+------------+----------+--------------------+------------------+
|           Inception|2010|     14075|           8| 29.108149|[Action, Thriller...| 7.917588057742396|
|     The Dark Knight|2008|     12269|           8|123.167259|[Drama, Action, C...| 7.905871457906355|
|        Interstellar|2014|     11187|           8| 32.213481|[Adventure, Drama...| 7.897107402958818|
|          Fight Club|1999|      9678|           8| 63.869599|             [Drama]| 7.881752880714441|
|The Lord of the R...|2001|      8892|           8| 32.070725|[Adventure, Fanta...| 7.871786953654775|
|        Pulp Fiction|1994|      8670|           8|140.950236|   [Thriller, Crime]| 7.868660493166128|
|The Shawshank Red...|1994|      8358|           8| 51.645403|      [Dram

In [145]:
from pyspark.sql.types import StringType, ArrayType
gen_md = md.withColumn("genre", explode(col("genres")))
gen_md = gen_md.drop("genres")
gen_md.show(truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+------------+------------------------------------------------------------------------------+---------------------------+------------+----------+----+---------+
|id   |overview                                                                                                                                                                                                                                                                                                                                                                                                   |popularity|release_date

In [146]:
from pyspark.sql.functions import col, mean, lit, when, expr
from pyspark.sql.types import DoubleType


def build_chart(genre, percentile=0.85):
    genre_df = gen_md.filter(col("genre") == genre)
    genre_df = genre_df.filter(
        col("vote_count").isNotNull() & col("vote_average").isNotNull())
    genre_df = genre_df.withColumn("vote_count", col("vote_count").cast("int"))
    genre_df = genre_df.withColumn(
        "vote_average", col("vote_average").cast("double"))
    C = genre_df.select(mean(col("vote_average"))).first()[0]
    m = genre_df.approxQuantile("vote_count", [percentile], 0.01)[0]
    qualified = genre_df.filter(col("vote_count") >= m).select(
        "title", "year", "vote_count", "vote_average", "popularity"
    )
    qualified = qualified.withColumn(
        "wr",
        (col("vote_count") / (col("vote_count") + lit(m)) * col("vote_average")) +
        (lit(m) / (col("vote_count") + lit(m)) * lit(C))
    )

    qualified = qualified.orderBy(col("wr").desc()).limit(250)

    return qualified

In [147]:
build_chart('Romance').show(5, truncate=False)

+---------------------------+----+----------+------------+----------+-----------------+
|title                      |year|vote_count|vote_average|popularity|wr               |
+---------------------------+----+----------+------------+----------+-----------------+
|Dilwale Dulhania Le Jayenge|1995|661       |9.1         |34.457024 |8.728219129758896|
|Your Name.                 |2016|1030      |8.5         |34.461252 |8.296609741176281|
|Forrest Gump               |1994|8147      |8.2         |48.307194 |8.17553435204354 |
|Cinema Paradiso            |1988|834       |8.2         |14.177005 |7.980635350403461|
|La La Land                 |2016|4745      |7.9         |19.681686 |7.863516929316707|
+---------------------------+----+----------+------------+----------+-----------------+
only showing top 5 rows



\*\*Content-based recommendation system


In [148]:
links = spark.read.csv(
    f"{PATH_TO_DATA}/links_small.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    quote='"',
    sep=',',
    ignoreTrailingWhiteSpace=True,  #
    ignoreLeadingWhiteSpace=True
)
links.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [149]:
links = links.filter(col('tmdbId').isNotNull()) \
    .select(col('tmdbId').cast('int'))

In [150]:
md = md.withColumn('id', col('id').cast('int'))

In [151]:
md = md.filter(col('id').isNotNull())

Movie description Based Recommender


\*Handle Meta data of movies


In [152]:
keywords = spark.read.csv(
    f"{PATH_TO_DATA}/keywords.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    escape="\"",
    quote='"',
    sep=',',
    ignoreTrailingWhiteSpace=True,
    ignoreLeadingWhiteSpace=True
)

In [153]:
credits_pd = pd.read_csv(f"{PATH_TO_DATA}/credits.csv")
crew_struct = StructType([
    StructField("job", StringType(), True),
    StructField("name", StringType(), True)
])

credits_pd['crew'] = credits_pd['crew'].apply(literal_eval)
credits_pd['crew'] = credits_pd['crew'].apply(
    lambda x: [{'job': i['job'], 'name': i['name']} for i in x])
credits_pd['cast'] = credits_pd['cast'].apply(literal_eval)
credits_pd['cast'] = credits_pd['cast'].apply(
    lambda x: [{'name': i['name']} for i in x])
credits_pd['cast'] = credits_pd['cast'].apply(
    lambda x: x[:3] if len(x) > 3 else x)
credits_pd['id'] = credits_pd['id'].astype(int)

credits = spark.createDataFrame(credits_pd)

In [154]:
credits.show(5, truncate=False)

+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [155]:
from pyspark.sql.functions import transform, expr, filter
credits = credits.withColumn(
    'director',
    expr("filter(crew, x -> x.job = 'Director')")
)
credits = credits.withColumn(
    'director',
    expr("transform(director, x -> x.name)")
)
credits = credits.withColumn("cast", expr("transform(cast, x -> x.name)"))

In [156]:
credits = credits.withColumn('id', col('id').cast('int'))
keywords = keywords.withColumn('id', col('id').cast('int'))

In [157]:
credits = credits.withColumn(
    'director',
    expr("transform(director, x -> array_repeat(x, 3))")
)

In [158]:
keywords_schema = ArrayType(StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True)
]))
keywords = keywords.withColumn(
    'keywords', from_json(col('keywords'), keywords_schema))
keywords = keywords.withColumn(
    'keywords', expr("transform(keywords, x -> x.name)"))

In [159]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
from nltk.stem.snowball import SnowballStemmer

stemmer = SnowballStemmer('english')


@udf(returnType=ArrayType(StringType()))
def process_keywords(keywords_array):
    if keywords_array is None:
        return []

    processed = [str.lower(k.replace(" ", "")) for k in keywords_array]
    stemmed = [stemmer.stem(k) for k in processed]
    return sorted(list(set(stemmed)))


keywords = keywords.withColumn("keywords", process_keywords(col("keywords")))

In [160]:
md = md.join(credits, md['id'] == credits['id'], 'left').drop(credits['id'])
md = md.join(keywords, md['id'] == keywords['id'], 'left').drop(keywords['id'])
md = md.join(links, md['id'] == links['tmdbId'])
md = md.withColumn('tagline', coalesce(col('tagline').cast('string'), lit('')))
md = md.withColumn('overview', coalesce(
    col('overview').cast('string'), lit('')))
md = md.withColumn('description', concat(
    col('tagline'), lit(' '), col('overview')))

In [161]:
md.show(5, truncate=True)

+--------------------+---+--------------------+----------+------------+--------------------+--------------+------------+----------+----+--------------------+--------------------+--------------------+--------------------+------+--------------------+
|              genres| id|            overview|popularity|release_date|             tagline|         title|vote_average|vote_count|year|                cast|                crew|            director|            keywords|tmdbId|         description|
+--------------------+---+--------------------+----------+------------+--------------------+--------------+------------+----------+----+--------------------+--------------------+--------------------+--------------------+------+--------------------+
|     [Crime, Comedy]|  5|It's Ted the Bell...|  9.026586|  1995-12-09|Twelve outrageous...|    Four Rooms|         6.5|       539|1995|[Tim Roth, Antoni...|[{name -> Combust...|[[Allison Anders,...|[bet, episodefilm...|     5|Twelve outrageous...|
|[Ac

In [1]:
from elasticsearch import Elasticsearch
es = Elasticsearch(hosts="http://localhost:9200")
es.info(pretty=True)

ObjectApiResponse({'name': '4bbae3288639', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'dsCurIfHRImh4-bX_WtMsA', 'version': {'number': '8.10.2', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '6d20dd8ce62365be9b1aca96427de4622e970e9e', 'build_date': '2023-09-19T08:16:24.564900370Z', 'build_snapshot': False, 'lucene_version': '9.7.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [105]:
VECTOR_DIM = 20
VECTOR_META_DIM = 384
create_ratings = {
    "mappings": {
        "properties": {
            "timestamp": {
                "type": "date"
            },
            "userId": {
                "type": "integer"
            },
            "movieId": {
                "type": "integer"
            },
            "rating": {
                "type": "double"
            }
        }
    }
}
create_users = {
    "mappings": {
        "properties": {
            "userId": {
                "type": "integer"
            },
            "model_factor": {
                "type": "dense_vector",
                "dims": VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }
        }
    }
}

create_movies = {
    "mappings": {
        "properties": {
            "movieId": {
                "type": "integer"
            },
            "year": {
                "type": "date",
                "format": "year"
            },
            "title": {
                "type": "text",
                "analyzer": "english",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "tmdbId": {
                "type": "keyword"
            },
            "vote_count": {
                "type": "integer"
            },
            "vote_average": {
                "type": "double"
            },
            "popularity": {
                "type": "double"
            },
            "description": {
                "type": "text",
                "analyzer": "english",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "meta_factor": {
                "type": "dense_vector",
                "dims": VECTOR_META_DIM
            },
            "model_factor": {
                "type": "dense_vector",
                "dims": VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }
        }
    }
}


def delete_index(index_name):
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"Deleted index: {index_name}")


delete_index("ratings")
delete_index("users")
delete_index("movies")
res_ratings = es.indices.create(index="ratings", body=create_ratings)
res_users = es.indices.create(index="users", body=create_users)
res_movies = es.indices.create(index="movies", body=create_movies)

print("Created indices:")
print(res_ratings)
print(res_users)
print(res_movies)

Deleted index: ratings
Deleted index: users
Deleted index: movies
Created indices:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'ratings'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'users'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'movies'}


In [170]:
ratings = spark.read.csv(
    f"{PATH_TO_DATA}/ratings_small.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    quote='"',
    sep=',',
    ignoreTrailingWhiteSpace=True,
    ignoreLeadingWhiteSpace=True
)

In [163]:
md = md.withColumn('director', expr("transform(flatten(director), x -> x)"))

In [164]:
insert = md.withColumn("meta_data", concat_ws(
    " ",
    concat_ws(" ", col("keywords")),
    concat_ws(" ", col("cast")),
    concat_ws("", col("director")),
    concat_ws(" ", col("genres"))
))

In [123]:
insert.show(5)

+--------------------+---+--------------------+----------+------------+--------------------+-------------------+------------+----------+----+--------------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+
|              genres| id|            overview|popularity|release_date|             tagline|              title|vote_average|vote_count|year|                cast|                crew|            director|            keywords|tmdbId|         description|           meta_data|
+--------------------+---+--------------------+----------+------------+--------------------+-------------------+------------+----------+----+--------------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+
|     [Drama, Comedy]|  3|An episode in the...|   2.29211|  1986-10-16|                    |Shadows in Paradise|         7.1|        35|1986|[Matti Pellonpää,...|[{name -> Aki

In [165]:
movies_data_deduplicated = insert.dropDuplicates(["id"])

In [166]:
movies_data_deduplicated = movies_data_deduplicated.withColumn(
    "movieId", col("id"))

In [167]:
movies_data_deduplicated.show(10, truncate=False)

+----------------------------+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------+------------+----------+----+-------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [168]:
from sentence_transformers import SentenceTransformer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import numpy as np
import re
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt_tab')
model = SentenceTransformer('all-MiniLM-L6-v2')
VECTOR_META_DIM = 384


def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    tokens = word_tokenize(text)
    stop_words = set(stopwords.words('english'))
    tokens = [t for t in tokens if t not in stop_words]
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(t) for t in tokens]
    return tokens


def get_weighted_meta_embeddings(movie_data):
    weights = {
        'description': 0.35,
        'title': 0.25,
        'meta_data': 0.4,
    }
    vector_dim = model.get_sentence_embedding_dimension()
    embeddings = {field: np.zeros(vector_dim) for field in weights}
    for field in weights:
        if field in movie_data and movie_data[field]:
            if isinstance(movie_data[field], list):
                text = ' '.join(movie_data[field])
            else:
                text = str(movie_data[field])
            tokens = preprocess_text(text)
            if len(tokens) <= 256:
                field_embedding = model.encode(' '.join(tokens))
            else:
                chunks = [tokens[i:i + 256]
                          for i in range(0, len(tokens), 256)]
                chunk_embeddings = [model.encode(
                    ' '.join(chunk)) for chunk in chunks]
                field_embedding = np.mean(chunk_embeddings, axis=0)

            embeddings[field] = field_embedding * weights[field]
    meta_embedding = np.sum(list(embeddings.values()), axis=0)
    return meta_embedding


def process_meta_features(movies_df):
    meta_features = []
    for _, movie in movies_df.iterrows():
        meta_embedding = get_weighted_meta_embeddings(movie)
        meta_features.append({
            'movieId': movie['movieId'],
            'meta_factor': meta_embedding.tolist()
        })
    return spark.createDataFrame(meta_features)


movies_data_deduplicated.show(5)
meta_features_df = process_meta_features(movies_data_deduplicated.toPandas())

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\09398\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\09398\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\09398\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\09398\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


+--------------------+---+--------------------+----------+------------+--------------------+--------------+------------+----------+----+--------------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+-------+
|              genres| id|            overview|popularity|release_date|             tagline|         title|vote_average|vote_count|year|                cast|                crew|            director|            keywords|tmdbId|         description|           meta_data|movieId|
+--------------------+---+--------------------+----------+------------+--------------------+--------------+------------+----------+----+--------------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+-------+
|     [Crime, Comedy]|  5|It's Ted the Bell...|  9.026586|  1995-12-09|Twelve outrageous...|    Four Rooms|         6.5|       539|1995|[Tim Roth, Antoni...|[{name ->

In [169]:
meta_features_df.show(5)

+--------------------+-------+
|         meta_factor|movieId|
+--------------------+-------+
|[-0.0079367812722...|      5|
|[0.01152728311717...|      6|
|[-0.0574968159198...|     12|
|[-0.0772633850574...|     13|
|[-0.0738316923379...|     15|
+--------------------+-------+
only showing top 5 rows



In [176]:
movies_data_to_indexed = movies_data_deduplicated.join(
    meta_features_df, on='movieId', how='inner').drop(meta_features_df['movieId'])

In [177]:
movies_data_to_indexed = movies_data_to_indexed.drop("meta_data")

In [179]:
movies_data_to_indexed.write.format("es").option(
    "es.mapping.id", "movieId").save("movies")
num_movies_df = movies_data_to_indexed.count()
num_movies_es = es.count(index="movies")['count']
print("Movie DF count: {}".format(num_movies_df))
print("ES index count: {}".format(num_movies_es))

Movie DF count: 9082
ES index count: 9082


In [180]:
es.indices.refresh(index="movies")

ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})

In [181]:
ratings = ratings.filter(col('movieId').isin(
    [i.movieId for i in movies_data_deduplicated.select("movieId").collect()]))

In [182]:
ratings.write.format("es").save("ratings")
num_ratings_es = es.count(index="ratings")['count']
num_ratings_df = ratings.count()
print("Dataframe count: {}".format(num_ratings_df))
print("ES index count:  {}".format(num_ratings_es))

Dataframe count: 32131
ES index count:  32131


In [9]:
# Get all ratings from Elasticsearch index
ratings_es = spark.read.format("es").load("ratings")
ratings_es.show(5)

+-------+------+--------------------+------+
|movieId|rating|           timestamp|userId|
+-------+------+--------------------+------+
|   1371|   2.5|1970-01-15 22:12:...|     1|
|   1405|   1.0|1970-01-15 22:12:...|     1|
|   2105|   4.0|1970-01-15 22:12:...|     1|
|   2193|   2.0|1970-01-15 22:12:...|     1|
|   2294|   2.0|1970-01-15 22:12:...|     1|
+-------+------+--------------------+------+
only showing top 5 rows



In [10]:
from pyspark.ml.recommendation import ALS

In [188]:

null_counts = ratings.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in ratings.columns])
null_counts.show()
total_rows = ratings.count()
print(f"Total number of rows: {total_rows}")

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+

Total number of rows: 32131


In [13]:
from itertools import product
from pyspark.sql.functions import row_number, rand
from pyspark.sql import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
from sklearn.metrics import ndcg_score
import numpy as np


def evaluate_recommendations(model, test_data):
    if not model or not test_data:
        print("intput error")
        return None
    test_data_clean = test_data.na.drop()
    predictions = model.transform(test_data_clean)
    predictions = predictions.filter(col("prediction").isNotNull())

    evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)

    evaluator = RegressionEvaluator(
        metricName="mae",
        labelCol="rating",
        predictionCol="prediction"
    )
    mae = evaluator.evaluate(predictions)
    k = 10
    true_ratings = predictions.select(
        "rating").rdd.flatMap(lambda x: x).collect()
    pred_ratings = predictions.select(
        "prediction").rdd.flatMap(lambda x: x).collect()
    if not true_ratings or not pred_ratings:
        ndcg = 0
    else:
        ndcg = ndcg_score([true_ratings], [pred_ratings], k=k)

    return {
        "RMSE": rmse,
        "MAE": mae,
        "NDCG@10": ndcg
    }


def improved_als_model(ratings, rank=20, regParam=0.1, alpha=1.0):
    ratings_clean = ratings.na.drop()
    als = ALS(
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        rank=rank,
        regParam=regParam,
        implicitPrefs=True,
        alpha=alpha,
        nonnegative=True,
        seed=54,
        coldStartStrategy="drop"
    )
    model = als.fit(ratings_clean)
    return model


def tune_hyperparameters(ratings):
    reg_params = [0.01, 0.1, 1.0]
    alphas = [1, 10, 40]
    ranks = [10, 20, 30]
    best_metrics = None
    best_params = None
    for reg, alpha, rank in product(reg_params, alphas, ranks):
        print(f"regParam={reg}, alpha={alpha}, rank={rank}")
        train, test = ratings.randomSplit([0.8, 0.2])
        model = improved_als_model(
            train,
            rank=rank,
            regParam=reg,
            alpha=alpha
        )
        metrics = evaluate_recommendations(model, test)
        if (best_metrics is None or
                metrics["RMSE"] < best_metrics["RMSE"]):
            best_metrics = metrics
            best_params = {
                "regParam": reg,
                "alpha": alpha,
                "rank": rank
            }
    return best_params, best_metrics


best_params, best_metrics = tune_hyperparameters(ratings_es)
print("Best parameters:", best_params)
print("Best metrics:", best_metrics)
final_model = improved_als_model(
    ratings_es,
    rank=rank,
    regParam=best_params["regParam"],
    alpha=best_params["alpha"]
)

regParam=0.01, alpha=1, rank=10
regParam=0.01, alpha=1, rank=20
regParam=0.01, alpha=1, rank=30
regParam=0.01, alpha=10, rank=10
regParam=0.01, alpha=10, rank=20
regParam=0.01, alpha=10, rank=30
regParam=0.01, alpha=40, rank=10
regParam=0.01, alpha=40, rank=20
regParam=0.01, alpha=40, rank=30
regParam=0.1, alpha=1, rank=10
regParam=0.1, alpha=1, rank=20
regParam=0.1, alpha=1, rank=30
regParam=0.1, alpha=10, rank=10
regParam=0.1, alpha=10, rank=20
regParam=0.1, alpha=10, rank=30
regParam=0.1, alpha=40, rank=10
regParam=0.1, alpha=40, rank=20
regParam=0.1, alpha=40, rank=30
regParam=1.0, alpha=1, rank=10
regParam=1.0, alpha=1, rank=20
regParam=1.0, alpha=1, rank=30
regParam=1.0, alpha=10, rank=10
regParam=1.0, alpha=10, rank=20
regParam=1.0, alpha=10, rank=30
regParam=1.0, alpha=40, rank=10
regParam=1.0, alpha=40, rank=20
regParam=1.0, alpha=40, rank=30
Best parameters: {'regParam': 0.1, 'alpha': 40, 'rank': 10}
Best metrics: {'RMSE': 2.9019140886576165, 'MAE': 2.7127171438015525, 'NDCG@

TypeError: Invalid param value given for param "rank". Could not convert <function rank at 0x0000023A3A598A60> to int

In [207]:

final_model.userFactors.show(5)
final_model.itemFactors.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.0, 0.0, 0.3113...|
| 20|[0.6109069, 0.0, ...|
| 30|[0.28762525, 0.05...|
| 40|[0.0, 0.0, 0.9947...|
| 50|[0.0, 0.0, 0.2936...|
+---+--------------------+
only showing top 5 rows

+---+--------------------+
| id|            features|
+---+--------------------+
| 20|[0.43706286, 0.0,...|
| 70|[0.65708596, 0.40...|
| 80|[0.0, 0.0, 0.0, 0...|
|100|[0.0, 0.0, 0.0, 0...|
|110|[0.33686087, 0.10...|
+---+--------------------+
only showing top 5 rows



In [209]:
from pyspark.sql.functions import lit, current_timestamp, unix_timestamp
ver = final_model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = final_model.itemFactors.select(col("id").alias("movieId"),
                                               col("features").alias(
                                                   "model_factor"),
                                               lit(ver).alias("model_version"),
                                               ts.alias("model_timestamp"))
movie_vectors.show(5)
user_vectors = final_model.userFactors.select(col("id").alias("userId"),
                                              col("features").alias(
                                                  "model_factor"),
                                              lit(ver).alias("model_version"),
                                              ts.alias("model_timestamp"))
user_vectors.show(5)

+-------+--------------------+----------------+---------------+
|movieId|        model_factor|   model_version|model_timestamp|
+-------+--------------------+----------------+---------------+
|     20|[0.43706286, 0.0,...|ALS_53272c9f13ec|     1734864067|
|     70|[0.65708596, 0.40...|ALS_53272c9f13ec|     1734864067|
|     80|[0.0, 0.0, 0.0, 0...|ALS_53272c9f13ec|     1734864067|
|    100|[0.0, 0.0, 0.0, 0...|ALS_53272c9f13ec|     1734864067|
|    110|[0.33686087, 0.10...|ALS_53272c9f13ec|     1734864067|
+-------+--------------------+----------------+---------------+
only showing top 5 rows

+------+--------------------+----------------+---------------+
|userId|        model_factor|   model_version|model_timestamp|
+------+--------------------+----------------+---------------+
|    10|[0.0, 0.0, 0.3113...|ALS_53272c9f13ec|     1734864067|
|    20|[0.6109069, 0.0, ...|ALS_53272c9f13ec|     1734864067|
|    30|[0.28762525, 0.05...|ALS_53272c9f13ec|     1734864067|
|    40|[0.0, 0.0, 0.

In [210]:
movie_vectors.write.format("es") \
    .option("es.mapping.id", "movieId") \
    .option("es.write.operation", "update") \
    .save("movies", mode="append")

In [211]:
user_vectors.write.format("es") \
    .option("es.mapping.id", "userId") \
    .save("users")

In [212]:
es.indices.refresh(index="movies")
es.indices.refresh(index="users")

ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})

In [215]:
def vector_query(query_vec, vector_field, q="*", cosine=False):
    if cosine:
        score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}') + 1.0"
    else:
        score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"

    score_fn = score_fn.format(v=vector_field, fn=score_fn)

    return {
        "query": {
            "script_score": {
                "query": {
                    "query_string": {
                        "query": q
                    }
                },
                "script": {
                    "source": score_fn,
                    "params": {
                        "vector": query_vec
                    }
                }
            }
        }
    }

In [218]:
def compute_global_params(index="movies"):
    body = {
        "size": 0,
        "aggs": {
            "avg_vote": {"avg": {"field": "vote_average"}},
            "min_votes": {"percentiles": {"field": "vote_count", "percents": [65]}},
            "max_popularity": {"max": {"field": "popularity"}}
        }
    }
    results = es.search(index=index, body=body)
    C = results['aggregations']['avg_vote']['value']
    m = results['aggregations']['min_votes']['values']['65.0']
    P_max = results['aggregations']['max_popularity']['value']
    return C, m, P_max


C, m, P_max = compute_global_params()
lambda_popularity = 0.5

In [217]:
print(C, m, P_max)

6.362177934375689 210.1841299320521 547.488298


In [219]:
def calculate_weighted_rating(v, R, P, C, m, P_max, lambda_popularity=0.5):

    try:
        P = float(P)
    except (TypeError, ValueError):
        print("error p")
        P = 0

    if v > 0:
        wr = ((v / (v + m)) * R) + ((m / (v + m)) * C) + \
            lambda_popularity * (P / P_max if P_max else 0)
    else:
        wr = lambda_popularity * (P / P_max if P_max else 0)
    return wr

In [220]:
def process_recommendations(hits, C, m, P_max, lambda_popularity=0.5):

    recommendations = []
    for hit in hits:
        rec = hit['_source']
        v = rec.get('vote_count', 0)
        R = rec.get('vote_average', 0)
        P = rec.get('popularity', 0)
        print(v, R, P)
        wr = calculate_weighted_rating(v, R, P, C, m, P_max, lambda_popularity)
        rec['weighted_rating'] = wr
        rec['original_score'] = hit['_score']
        recommendations.append(rec)
    recommendations.sort(key=lambda x: x['weighted_rating'], reverse=True)
    return recommendations

In [221]:
def get_similar(the_id, q="*", num=10, index="movies", vector_field='model_factor', cosine=False):
    response = es.get(index=index, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=cosine)
        results = es.search(index=index, body=q)
        hits = results['hits']['hits']
        recommendations = process_recommendations(
            hits, C, m, P_max, lambda_popularity)
        return src, recommendations[:num+1]

In [222]:
get_similar(2019)

237 6.1 7.834351
1210 7.0 12.715571
340 6.5 15.783615
1754 5.2 11.902354
101 6.1 7.464981
320 6.3 9.430812
722 6.9 15.831374
521 7.7 14.003467
369 6.3 13.246795
787 6.7 10.529576


({'movieId': 2019,
  'genres': ['Action', 'Adventure', 'Crime', 'Thriller'],
  'id': 2019,
  'overview': "When a woman's father goes missing, she enlist a local to aid in her search.  The pair soon discover that her father has died at the hands of a wealthy sportsman who hunts homeless men as a form of recreation.",
  'popularity': '7.834351',
  'release_date': 745779600000,
  'tagline': "Don't hunt what you can't kill.",
  'title': 'Hard Target',
  'vote_average': 6.1,
  'vote_count': 237,
  'year': 1993,
  'cast': ['Jean-Claude Van Damme', 'Arnold Vosloo', 'Lance Henriksen'],
  'crew': [{'name': 'John Woo', 'job': 'Director'},
   {'name': 'Chuck Pfarrer', 'job': 'Screenplay'},
   {'name': 'Sean Daniel', 'job': 'Producer'},
   {'name': 'James Jacks', 'job': 'Producer'},
   {'name': 'Russell Carpenter', 'job': 'Director of Photography'},
   {'name': 'Bob Murawski', 'job': 'Editor'},
   {'name': 'Phil Dagort', 'job': 'Production Design'},
   {'name': 'Philip Messina', 'job': 'Art Direct

In [223]:

def get_user_recs(the_id, q="*", num=10, users="users", movies="movies", vector_field='model_factor'):
    response = es.get(index=users, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=False)
        results = es.search(index=movies, body=q)
        hits = results['hits']['hits']
        recommendations = process_recommendations(
            hits, C, m, P_max, lambda_popularity)
        return src, recommendations[:num]

In [224]:
import json
from requests import HTTPError
import requests


def get_poster_url(movie_id):
    try:
        IMAGE_URL = 'https://image.tmdb.org/t/p/w500'
        url = "https://api.themoviedb.org/3/movie/" + \
            str(movie_id) + "?language=en-US"

        headers = {
            "accept": "application/json",
            "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJkNDM4ZjJmOGVmMjk5ZmI4ZTA5MWVlZDEyZWY0YzQyMiIsIm5iZiI6MTY3MDEyNzA0NC43Niwic3ViIjoiNjM4YzFkYzQwZTY0YWYwMGRlYWIyMTQ5Iiwic2NvcGVzIjpbImFwaV9yZWFkIl0sInZlcnNpb24iOjF9.o2bVzOt4bu9yJCQnB-7TLgyF-_VSS-sLPhQ8YAdz0-U"
        }
        movie_info = requests.get(url, headers=headers)
        movie_info = json.loads(movie_info.text)
        movie_poster_url = IMAGE_URL + movie_info['poster_path']
        return movie_poster_url
    except HTTPError as e:
        if e.response.status_code == 401:
            j = json.loads(e.response.text)
            print(j)

In [228]:
def get_hybrid_recommendations(user_id, content_id, alpha=0.5, num=10,
                               user_index="users", movie_index="movies",
                               user_vector_field='model_factor', content_vector_field='meta_factor'):

    user_src, user_recs = get_user_recs(
        user_id, users=user_index, movies=movie_index, vector_field=user_vector_field, num=num * 2)
    user_scores = {rec['movieId']: rec['original_score'] for rec in user_recs}
    content_src, content_recs = get_similar(
        content_id, index=movie_index, vector_field=content_vector_field, num=num * 2, cosine=False)
    content_scores = {rec['movieId']: rec['original_score']
                      for rec in content_recs}
    combined_scores = {}
    for movie_id in set(user_scores.keys()).union(content_scores.keys()):
        user_score = user_scores.get(movie_id, 0)
        content_score = content_scores.get(movie_id, 0)
        combined_scores[movie_id] = alpha * \
            user_score + (1 - alpha) * content_score
    ranked_movies = sorted(combined_scores.items(),
                           key=lambda x: x[1], reverse=True)[:num]
    recommendations = []
    for movie_id, score in ranked_movies:
        movie_data = es.get(index=movie_index, id=movie_id)['_source']
        movie_data['score'] = score
        recommendations.append(movie_data)
    return recommendations

In [229]:
get_hybrid_recommendations(1, 2019)

894 6.6 11.673366
688 5.3 14.007329
10 5.1 1.523446
2344 7.4 12.477872
1743 6.6 14.178281
48 6.5 12.130265
984 6.6 10.25721
384 6.4 7.008512
913 5.7 12.093928
146 5.6 9.243715
237 6.1 7.834351
194 6.0 7.481524
1240 7.2 10.08937
3083 7.7 15.565484
57 7.2 3.467871
109 5.9 9.337393
26 6.2 5.432879
7 5.1 1.217249
404 7.2 9.070215
1231 6.0 9.034929


[{'movieId': 1371,
  'genres': ['Drama'],
  'id': 1371,
  'overview': 'Now the world champion, Rocky Balboa is living in luxury and only fighting opponents who pose no threat to him in the ring. His lifestyle of wealth and idleness is shaken when a powerful young fighter known as Clubber Lang challenges him to a bout. After taking a pounding from Lang, the humbled champ turns to former bitter rival Apollo Creed to help him regain his form for a rematch with Lang.',
  'popularity': '11.673366',
  'release_date': 391366800000,
  'tagline': 'The greatest challenge.',
  'title': 'Rocky III',
  'vote_average': 6.6,
  'vote_count': 894,
  'year': 1982,
  'cast': ['Sylvester Stallone', 'Talia Shire', 'Burt Young'],
  'crew': [{'name': 'Sylvester Stallone', 'job': 'Director'},
   {'name': 'Sylvester Stallone', 'job': 'Screenplay'},
   {'name': 'Robert Chartoff', 'job': 'Producer'},
   {'name': 'Irwin Winkler', 'job': 'Producer'},
   {'name': 'Bill Conti', 'job': 'Original Music Composer'},
   

In [230]:
from IPython.core.display import HTML, Image


def get_movies_for_user(the_id, num=10, ratings="ratings", movies="movies"):
    response = es.search(index=ratings, q="userId:{}".format(
        the_id), size=num, sort=["rating:desc"])
    hits = response['hits']['hits']
    ids = [h['_source']['movieId'] for h in hits]
    movies = es.mget(body={"ids": ids}, index=movies,
                     _source_includes=['tmdbId', 'title'])
    movies_hits = movies['docs']
    tmdbids = [h['_source'] for h in movies_hits]
    return tmdbids


def display_user_recs(the_id, q="*", num=10, num_last=10, users="users", movies="movies", ratings="ratings"):
    user, recs = get_user_recs(the_id, q, num, users, movies)
    user_movies = get_movies_for_user(the_id, num_last, ratings, movies)
    # check that posters can be displayed
    first_movie = user_movies[0]
    first_im_url = get_poster_url(first_movie['tmdbId'])
    if first_im_url == "NA":
        display(
            HTML("<i>Cannot import tmdbsimple. No movie posters will be displayed!</i>"))
    if first_im_url == "KEY_ERR":
        display(HTML(
            "<i>Key error accessing TMDb API. Check your API key. No movie posters will be displayed!</i>"))

    # display the movies that this user has rated highly
    display(HTML("<h2>Get recommended movies for user id %s</h2>" % the_id))
    display(HTML("<h4>The user has rated the following movies highly:</h4>"))
    user_html = "<table border=0>"
    i = 0
    for movie in user_movies:
        movie_im_url = get_poster_url(movie['tmdbId'])
        movie_title = movie['title']
        user_html += "<td><h5>%s</h5><img src=%s width=150></img></td>" % (
            movie_title, movie_im_url)
        i += 1
        if i % 5 == 0:
            user_html += "</tr><tr>"
    user_html += "</tr></table>"
    display(HTML(user_html))
    # now display the recommended movies for the user
    display(HTML("<br>"))
    display(HTML("<h2>Recommended movies:</h2>"))
    rec_html = "<table border=0>"
    i = 0
    for rec in recs:
        r_im_url = get_poster_url(rec['_source']['tmdbId'])
        r_score = rec['_score']
        r_title = rec['_source']['title']
        rec_html += "<td><h5>%s</h5><img src=%s width=150></img></td><td><h5>%2.3f</h5></td>" % (
            r_title, r_im_url, r_score)
        i += 1
        if i % 5 == 0:
            rec_html += "</tr><tr>"
    rec_html += "</tr></table>"
    display(HTML(rec_html))


def display_similar(the_id, q="*", num=10, movies="movies"):
    """
    Display query movie, together with similar movies and similarity scores, in a table
    """
    movie, recs = get_similar(the_id, q, num, movies)
    q_im_url = get_poster_url(movie['tmdbId'])
    if q_im_url == "NA":
        display(
            HTML("<i>Cannot import tmdbsimple. No movie posters will be displayed!</i>"))
    if q_im_url == "KEY_ERR":
        display(HTML(
            "<i>Key error accessing TMDb API. Check your API key. No movie posters will be displayed!</i>"))

    display(HTML("<h2>Get similar movies for:</h2>"))
    display(HTML("<h4>%s</h4>" % movie['title']))
    display(HTML("<p>%s</p>" % movie['description']))
    if q_im_url != "NA":
        display(Image(q_im_url, width=200))
    display(HTML("<br>"))
    display(HTML("<h2>People who liked this movie also liked these:</h2>"))
    sim_html = "<table border=0>"
    i = 0
    for rec in recs:
        r_im_url = get_poster_url(rec['_source']['tmdbId'])
        r_score = rec['_score']
        r_title = rec['_source']['title']
        sim_html += "<td><h5>%s</h5><img src=%s width=150></img></td><td><h5>%2.3f</h5></td>" % (
            r_title, r_im_url, r_score)
        i += 1
        if i % 5 == 0:
            sim_html += "</tr><tr>"
    sim_html += "</tr></table>"
    display(HTML(sim_html))

In [22]:

from run_eval import evaluate_hybrid_recommender, evaluate_user_recommender, evaluate_similar_recommender
import numpy as np
import matplotlib.pyplot as plt

alphas = [0.3, 0.5, 0.7]
metrics = ['coverage', 'diversity', 'novelty']

for alpha in alphas:
    hybrid_results = evaluate_hybrid_recommender(es=es, alpha=alpha)
    user_results = evaluate_user_recommender(es=es)
    similar_results = evaluate_similar_recommender(es=es)

    data = {
        'Hybrid': [hybrid_results[metric] for metric in metrics],
        'Collaborative': [user_results[metric] for metric in metrics],
        'Content-based': [similar_results[metric] for metric in metrics]
    }

    x = np.arange(len(metrics))
    width = 0.25
    multiplier = 0

    fig, ax = plt.subplots(figsize=(10, 6))
    for recommender, measurement in data.items():
        offset = width * multiplier
        rects = ax.bar(x + offset, measurement, width, label=recommender)
        multiplier += 1

    ax.set_ylabel('Score')
    ax.set_title(f'Comparison of Recommender Systems (Alpha={alpha})')
    ax.set_xticks(x + width, metrics)
    ax.legend(loc='upper left', bbox_to_anchor=(1, 1))
    ax.set_ylim(0, 1)

    for rect in ax.patches:
        height = rect.get_height()
        ax.text(rect.get_x() + rect.get_width()/2., height,
                f'{height:.2f}',
                ha='center', va='bottom')

    plt.tight_layout()
    file_name = f'recommender_comparison_alpha_{alpha}.png'
    plt.savefig(file_name)
    print(f'Chart saved as {file_name}')
    plt.close(fig)

Chart saved as recommender_comparison_alpha_0.3.png
Chart saved as recommender_comparison_alpha_0.5.png
Chart saved as recommender_comparison_alpha_0.7.png
