## Final Project:
#### Hybrid recommender system with Ollama(Gemma3) integration
#### Data source: movielens https://files.grouplens.org/datasets/movielens/
#### Recommedation algorithms: PySpark ALS for user-user collaborative filtering and cosine similarity for content-based filtering
#### Limitation: Due to limited RAM storage, only about 44730 movies and 4310 users are used in this study


In [3]:
import zipfile
import numpy as np
import pandas as pd
import json
import requests
import io
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.sql.functions import explode

### Download the files from the source:

In [4]:
url = 'https://files.grouplens.org/datasets/movielens/ml_belief_2024_data_release_2.zip'

response = requests.get(url)
response.raise_for_status()

with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file:
    #Print all files inside the zip:
    
#read the target files only:
    with zip_file.open("data_release/movies.csv") as f:
        df_movie = pd.read_csv(f)

    with zip_file.open('data_release/user_rating_history.csv') as f:
        user_rating_df = pd.read_csv(f)


### Data preprocessing:

In [5]:
df_ratings = user_rating_df.drop('tstamp', axis=1)

#Due to overlimit of data, we will have to merge the movie dataset with the user rating dataset:
df_com = df_ratings.merge(df_movie, on='movieId', how='left')

print(f'Number of users in df_com: {df_com["userId"].nunique()}')
print(f'Number of movies in df_com: {df_com["movieId"].nunique()}')
print('-'*80)
#Check for any Null values in ratings:
print(f'Number of Na values in the rating column: {df_com["rating"].isna().sum()}')

#Let's remove those Na rows and some synethic ratings with negative values and re-count the number of users available:
df_com_cleaned = df_com[(df_com['rating'] >= 0) & (~df_com['rating'].isna()) & (~df_com['title'].isna())]

print(f'Number of Na values after cleaning inthe rating column: {df_com_cleaned["rating"].isna().sum()}')
print('-'*80)
print(f'Number of users after cleaning: {df_com_cleaned["userId"].nunique()}')
print(f'Number of movies after cleaning: {df_com_cleaned["movieId"].nunique()}')
print('-'*80)
#Create a reduced version of the dataset:
df_reduce = df_com_cleaned.sample(frac=0.5, random_state=42)
print(f'Number of users in reduced dataset: {df_reduce["userId"].nunique()}')
print(f'Number of movies in reduced dataset: {df_reduce["movieId"].nunique()}')

#Save to csv to directly read into spark to avoid crashing issues:
df_reduce.to_csv('df_reduce.csv')

Number of users in df_com: 4418
Number of movies in df_com: 85170
--------------------------------------------------------------------------------
Number of Na values in the rating column: 36521
Number of Na values after cleaning inthe rating column: 0
--------------------------------------------------------------------------------
Number of users after cleaning: 4415
Number of movies after cleaning: 57079
--------------------------------------------------------------------------------
Number of users in reduced dataset: 4310
Number of movies in reduced dataset: 44730


#### Since directly loading pandas dataframe into PySpark could cause compatibility issues, we have to save the preprocessed .csv file and then load it into Spark dataframe.

In [42]:
class HybridRecommender:
    def __init__(self, df_csv):
        self.spark = SparkSession.builder.getOrCreate()
        self.py_df_full = self.spark.read.csv(df_csv, header=True, inferSchema=True)
        self.py_df = self.py_df_full.select('userId', 'movieId','rating')
        self.df_reduce = self.py_df_full.toPandas()

        # Train ALS model
        als = ALS(
            maxIter=10,
            rank=10,
            regParam=0.1,
            userCol='userId',
            itemCol='movieId',
            ratingCol='rating',
            nonnegative=True,
            coldStartStrategy='drop',
            seed=42
        )
        self.als_model = als.fit(self.py_df)

        # Get ALS predictions
        rec_list = self.als_model.recommendForAllUsers(100)
        self.rec_list = rec_list.select('userId', explode('recommendations').alias('rec'))\
                                .select('userId', col('rec.movieId').alias('movieId'), col('rec.rating').alias('rating'))

        self.rec_unrated = self.rec_list.join(self.py_df, on=['userId', 'movieId'], how='left_anti')

        # Prepare metadata
        self.movie_meta = self.py_df_full.select('movieId', 'title', 'genres').dropDuplicates(['movieId']) #Making sure the metadata has only unique movie titles and IDs

        # Prepare TF-IDF for content-based
        df_movies_unique = self.df_reduce[['movieId', 'title', 'genres']].drop_duplicates('movieId').reset_index(drop=True)
        df_movies_unique['genres'] = df_movies_unique['genres'].fillna('')
        df_movies_unique['title'] = df_movies_unique['title'].fillna('')
        df_movies_unique['genres_clean'] = df_movies_unique['genres'].str.replace('|', ' ', regex=False)
        df_movies_unique['title_clean'] = df_movies_unique['title'].str.lower().str.replace(r'[^a-z0-9\s]', '', regex=True)
        df_movies_unique['features'] = (df_movies_unique['title_clean'] + ' ' + df_movies_unique['genres_clean'].str.lower()).str.strip()

        self.df_movies_unique = df_movies_unique

        tfidf = TfidfVectorizer(stop_words='english')
        self.tfidf_vec = tfidf.fit_transform(df_movies_unique['features'])
        self.cos_sim = linear_kernel(self.tfidf_vec) #Since TFIDF normalizes the data, the result of linear kernel is more efficient and same as cos-similarity.

        self.movie_id_to_index = pd.Series(df_movies_unique.index, index=df_movies_unique['movieId']).to_dict()

    def get_user_movie_vectors_for_prompt(self, user_id, movie_ids):
        #Item vectors:
        item_factors = self.als_model.itemFactors
        item_vectors = {}
        for mid in movie_ids:
            item_vectors[mid] = item_factors.filter(col('id') == mid).collect()[0]['features']

        item_vectors_str = {k: [round(x, 3) for x in v] for k, v in item_vectors.items()}
        item_vectors_str = json.dumps(item_vectors_str)
        
        #user vectors:
        user_vector = self.als_model.userFactors.filter(col('id') == user_id).collect()[0]['features']
        user_vector_str = json.dumps([round(x, 3) for x in user_vector])
        
        return {'user_vectors': user_vector_str,
                'item_vectors': item_vectors_str}
            

    def hybrid_recommender_for_user(self, user_id, top_n=5, alpha=0.5):
        user_rec = self.rec_unrated.filter(col('userId') == user_id).sort('rating', ascending=False)
        user_rec_full = user_rec.join(self.movie_meta, on='movieId', how='left')
        user_rec_full_df = user_rec_full.toPandas()

        user_liked = self.df_reduce[(self.df_reduce['userId'] == user_id) & (self.df_reduce['rating'] >= 4)]
        user_liked_movie_ids = user_liked['movieId'].tolist()
        user_liked_indices = [self.movie_id_to_index[mid] for mid in user_liked_movie_ids if mid in self.movie_id_to_index]

        def compute_content_score_by_mid(mid):
            if mid not in self.movie_id_to_index or not user_liked_indices:
                return 0.0
            idx = self.movie_id_to_index[mid]
            return self.cos_sim[idx, user_liked_indices].mean()

        user_rec_full_df['cb_score'] = user_rec_full_df['movieId'].apply(compute_content_score_by_mid)
        user_rec_full_df['hybrid_score'] = (user_rec_full_df['cb_score'] * alpha) + ((1 - alpha) * user_rec_full_df['rating'])

        return user_rec_full_df.sort_values(by='hybrid_score', ascending=False).head(top_n)

    def content_based_recommendations(self, user_id, top_n=5):
        
        df = self.df_reduce
        df_movies = self.df_movies_unique.copy()

        user_liked = df[(df['userId'] == user_id) & (df['rating'] >= 4)]
        user_liked_movie_ids = user_liked['movieId'].tolist()
        user_liked_indices = [self.movie_id_to_index[mid] for mid in user_liked_movie_ids if mid in self.movie_id_to_index]

        def compute_content_score_by_mid(mid):
            if mid not in self.movie_id_to_index or not user_liked_indices:
                return 0.0
            idx = self.movie_id_to_index[mid]
            return self.cos_sim[idx, user_liked_indices].mean()

        df_movies['cb_score'] = df_movies['movieId'].apply(compute_content_score_by_mid)
        df_movies = df_movies[~df_movies['movieId'].isin(user_liked_movie_ids)]

        return df_movies[['movieId', 'title', 'features', 'cb_score']].sort_values(by='cb_score', ascending=False).head(top_n)

    def cf_recommendations(self, user_id, top_n=5):
        user_rec = self.rec_unrated.filter(col('userId') == user_id).sort('rating', ascending=False)
        user_rec_full = user_rec.join(self.movie_meta, on='movieId', how='left')
        return user_rec_full.select('movieId', 'title', 'genres', 'rating').limit(top_n).toPandas()
    
    def llm_explain(self, user_id, top_n=5, alpha=0.5, model='gemma3:latest', include_content=True):
        import ollama
        
        #get recommendation results and metadata:
        top_recs = self.hybrid_recommender_for_user(user_id, top_n, alpha)
        movie_ids = top_recs['movieId'].to_list()
        titles = top_recs['title'].to_list()
        genres = top_recs['genres'].to_list()
        cb_score = top_recs['cb_score'].to_list()
        
        #get latent factors for user and movies:
        latent_factors = self.get_user_movie_vectors_for_prompt(user_id, movie_ids)
        
        #Movie titles and genres that the user had liked in the past:
        liked_titles = self.df_reduce[(self.df_reduce['userId']==user_id) & (self.df_reduce['rating'] >=4)]
                
        #Construct the prompt:
        movie_info = ""
        if include_content:
            movie_info = '\n'.join(
                [f'- {title} ({genre}) {cb_score}' for title, genre, cb_score in zip(titles, genres, cb_score)]
            )
        prompt = f"""
        You are a movie recommendation expert. A user has been analyzed using a hybrid recommender score, combining collaborative filtering and content-based filtering.
        Here is the original dataset with movie title, movieId, genres, etcs
        {self.df_reduce}
        
        Here is the user and the items' latent factor vectors:
        {latent_factors}
        
        The top recommended movies for this user are:
        {movie_info}
        
        The user had liked these top_N movies from the past with title and genre:
        {liked_titles}
        
        Alpha: {alpha} is the weighted hybrid factor that influences the final recommendation.
        This is the equation: hybrid_score = (content based score * alpha) + ((1 - alpha) * collaborative filtering score)
               
        Explain the following:
        1) The user's movie genre preferences, using the latent factors and reference the movie title.
        2) How the hybrid factor weights on the recommendations
        3) Why these movies might be recommended to this user, referencing the vectors or user's past liked genres if relevant.
        4) Would you recommend any other movies different from the ones listed, analyzing from all provided data.
        
              
        """
        client = ollama.Client()
        response = client.generate(model = model, prompt=prompt)
        return response.response
        

### To generate recommendation explanations using Gemma3 LLM:

In [38]:
#Define class
recommender = HybridRecommender('df_reduce.csv')
#get the LLM method
explain = recommender.llm_explain(user_id=274288, top_n=5, alpha=0.5)
#Print explanations
print(explain)

Okay, let's break down this movie recommendation scenario.

**1) User’s Movie Genre Preferences:**

Based on the latent factor vectors and the provided movie recommendations, this user strongly gravitates towards a blend of genres, particularly within the thriller and drama categories. Let’s examine the top recommendations:

*   **Camp Dread (2014) (Horror|Mystery|Thriller):** The user’s vector shows a high value (0.572) in the ‘Thriller’ dimension. This, combined with the horror and mystery elements, aligns perfectly with the recommendation.
*   **Last Kiss, The (Ultimo bacio, L') (2001) (Comedy|Drama|Romance):** The user’s vector (0.093) indicates a preference for ‘Drama’ combined with the comedic and romantic elements.
*   **Aayitha Ezhuthu (2004) (Action|Drama|Romance):**  The high ‘Drama’ factor (0.572) and the inclusion of ‘Action’ indicate a user likely looking for a dramatic story with action elements.
*   **National Theatre Live: Fleabag (2019) (Comedy):** The user's vector (0

### Accessing other recommendation methods within the recommender class:

    Content_based recommendations:

In [39]:
recommender.content_based_recommendations(274288)

Unnamed: 0,movieId,title,features,cb_score
38441,49593,She (1965),she 1965 action adventure drama fantasy horror...,0.097004
590,48774,Children of Men (2006),children of men 2006 action adventure drama sc...,0.090101
3614,130520,Home (2015),home 2015 adventure animation children comedy ...,0.088971
5863,224276,Over the Moon (2020),over the moon 2020 adventure animation childre...,0.088413
19598,199792,Comedy (2002),comedy 2002 action animation drama fantasy horror,0.086068


    User Collaborative filtering recommendations:

In [40]:
#user Collaborative filtering recommendations:
recommender.cf_recommendations(274288)

Unnamed: 0,movieId,title,genres,rating
0,62208,Sound of the Mountain (Thunder of the Mountain...,Drama,4.65573
1,231261,De Nuremberg à Nuremberg (1989),Documentary|War,4.581433
2,81371,There Was a Father (Chichi ariki) (1942),Drama,4.938606
3,187125,Head Above Water (1993),Comedy|Thriller,4.542495
4,148402,Drone (2014),Documentary|War,4.513318


    Hybrid recommendations:

In [41]:
recommender.hybrid_recommender_for_user(274288, top_n=5, alpha=0.5)

Unnamed: 0,movieId,userId,rating,title,genres,cb_score,hybrid_score
70,147342,274288,5.624179,Camp Dread (2014),Horror|Mystery|Thriller,0.019068,2.821624
43,5503,274288,5.311674,"Last Kiss, The (Ultimo bacio, L') (2001)",Comedy|Drama|Romance,0.023214,2.667444
69,146632,274288,5.255672,Aayitha Ezhuthu (2004),Action|Drama|Romance,0.026525,2.641098
9,223011,274288,5.222848,National Theatre Live: Fleabag (2019),Comedy,0.009688,2.616268
98,224242,274288,5.094249,The Letter Reader (2019),Drama,0.015133,2.554691


### Conclusion:
 The hybrid system with LLM integration is highly effective at generating insights in human-understandable language—provided that sufficient metadata is available for the LLM to analyze.
As demonstrated in this study, although the LLM used here is based on Google’s Gemini 3, which may not be the most up-to-date model, it can still approximate and articulate insights related to latent factors that are otherwise uninterpretable to humans. Furthermore, natural language can be used to query or adjust model outputs, making interaction more intuitive.

In addition to the convenience offered by the LLM, the hybrid system also effectively balances content-based and collaborative filtering techniques. As shown above, the system can dynamically adjust its recommendation list based on the hybrid weighting factor, alpha. This allows for more flexible suggestions and helps introduce greater content diversity to users.

However, hybrid models are still susceptible to the cold start problem, particularly when new users with no prior interactions join the system. In such cases, the system struggles to generate meaningful recommendations. A commonly used solution, adopted by many applications and websites, is to prompt new users to voluntarily input their preferences. This initial input provides enough data to bootstrap the recommendation process. A similar approach can be applied in future iterations of this project—where an LLM could be tasked with interactively prompting new users to share their preferences when their profile is found to be empty.
