# Spin the Reel Project Demo
# APAN 5400 - Summer 2025
# Alessio Maritez-Larios, Angela Ding, Eleanor Wen, Mariela Bracete, Yu Lan Tang

Please note that this code will take approximately 30 minutes to run

In [1]:
#Installing required packages
#Installing Facebook AI Similarity Search to compute similarity
#!pip install pandas numpy sqlalchemy psycopg2-binary pymongo pyspark faiss-cpu pretty_html_table

#Importing libraries

import pandas as pd
import numpy as np
import psycopg2
import os
from datetime import datetime
import json
import ast
import faiss
from tqdm import tqdm
from pymongo import MongoClient
from sqlalchemy import create_engine
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col

## Data Processing

Read in data

In [2]:
print(datetime.now())

2025-08-11 14:56:15.912302


In [3]:
#Loading the TMDB Movies Dataset

tmdb_og = pd.read_csv('TMDB_movie_dataset_v11.csv')
ratings = pd.read_csv("ratings.csv")

Clean Data

In [4]:
#Filtering the data to obtain a clean dataset of unique, valid movies with sufficient metadata for next steps

#Clean the dataset before TF-IDF
tmdb_df = tmdb_og[
    tmdb_og["overview"].notna() & 
    tmdb_og["genres"].notna() &
    (tmdb_og["overview"].str.strip() != "")
]

tmdb_df = tmdb_df.drop_duplicates(subset="original_title")
tmdb_df = tmdb_df.reset_index(drop=True)

print("Filtered shape:", tmdb_df.shape)

#Checking the first few rows
tmdb_df.head()

Filtered shape: (541854, 24)


Unnamed: 0,id,title,vote_average,vote_count,status,release_date,revenue,runtime,adult,backdrop_path,...,original_title,overview,popularity,poster_path,tagline,genres,production_companies,production_countries,spoken_languages,keywords
0,27205,Inception,8.364,34495,Released,2010-07-15,825532764,148,False,/8ZTVqvKDQ8emSGUEMjsS4yHAwrp.jpg,...,Inception,"Cobb, a skilled thief who commits corporate es...",83.952,/oYuLEt3zVCKq57qu2F8dT7NIa6f.jpg,Your mind is the scene of the crime.,"Action, Science Fiction, Adventure","Legendary Pictures, Syncopy, Warner Bros. Pict...","United Kingdom, United States of America","English, French, Japanese, Swahili","rescue, mission, dream, airplane, paris, franc..."
1,157336,Interstellar,8.417,32571,Released,2014-11-05,701729206,169,False,/pbrkL804c8yAv3zBZR4QPEafpAR.jpg,...,Interstellar,The adventures of a group of explorers who mak...,140.241,/gEU2QniE6E77NI6lCU6MxlNBvIx.jpg,Mankind was born on Earth. It was never meant ...,"Adventure, Drama, Science Fiction","Legendary Pictures, Syncopy, Lynda Obst Produc...","United Kingdom, United States of America",English,"rescue, future, spacecraft, race against time,..."
2,155,The Dark Knight,8.512,30619,Released,2008-07-16,1004558444,152,False,/nMKdUUepR0i5zn0y1T4CsSB5chy.jpg,...,The Dark Knight,Batman raises the stakes in his war on crime. ...,130.643,/qJ2tW6WMUDux911r6m7haRef0WH.jpg,Welcome to a world without rules.,"Drama, Action, Crime, Thriller","DC Comics, Legendary Pictures, Syncopy, Isobel...","United Kingdom, United States of America","English, Mandarin","joker, sadism, chaos, secret identity, crime f..."
3,19995,Avatar,7.573,29815,Released,2009-12-15,2923706026,162,False,/vL5LR6WdxWPjLPFRLe133jXWsh5.jpg,...,Avatar,"In the 22nd century, a paraplegic Marine is di...",79.932,/kyeqWdyUXW608qlYkRqosgbbJyK.jpg,Enter the world of Pandora.,"Action, Adventure, Fantasy, Science Fiction","Dune Entertainment, Lightstorm Entertainment, ...","United States of America, United Kingdom","English, Spanish","future, society, culture clash, space travel, ..."
4,24428,The Avengers,7.71,29166,Released,2012-04-25,1518815515,143,False,/9BBTo63ANSmhC4e6r62OJFuK2GL.jpg,...,The Avengers,When an unexpected enemy emerges and threatens...,98.082,/RYMX2wcKCBAr24UyPD7xwmjaTn.jpg,Some assembly required.,"Science Fiction, Action, Adventure",Marvel Studios,United States of America,"English, Hindi, Russian","new york city, superhero, shield, based on com..."


In [5]:
#Ensuring data is pre-processed and ready for next steps

def parse_field(x):
    # Only try to parse if it's a non-null string
    if isinstance(x, str) and x.strip() != "":
        try:
            return ast.literal_eval(x)
        except (ValueError, SyntaxError):
            # Fallback: Treat as a comma-separated string
            return [{"name": s.strip()} for s in x.split(",")]
    return []  # Return empty list for NaN, None, or non-string values

#Applying to JSON-like columns
json_cols = ['genres', 'keywords', 'production_companies', 'production_countries', 'spoken_languages']

for col in json_cols:
    tmdb_df[col] = tmdb_df[col].apply(parse_field)

#Extracting genre names as example
tmdb_df['genre_names'] = tmdb_df['genres'].apply(lambda x: [d['name'] for d in x if isinstance(d, dict) and 'name' in d])



In [6]:
#Adding genre_str and keywords_str to the dataframe in order to use text-based techniques like TF-IDF for content similarity later

#Creating 'genre_str' by joining the genre names
tmdb_df["genre_str"] = tmdb_df["genres"].apply(
    lambda x: ", ".join([d["name"] for d in x if isinstance(d, dict) and "name" in d]) if isinstance(x, list) else ""
)

#Creating 'keywords_str' by joining keyword names
tmdb_df["keywords_str"] = tmdb_df["keywords"].apply(
    lambda x: ", ".join([d["name"] for d in x if isinstance(d, dict) and "name" in d]) if isinstance(x, list) else ""
)

In [7]:
#Checking raw genres and cleaned up names

tmdb_df[['title', 'genres', 'genre_names']].head(3)

Unnamed: 0,title,genres,genre_names
0,Inception,"[{'name': 'Action'}, {'name': 'Science Fiction...","[Action, Science Fiction, Adventure]"
1,Interstellar,"[{'name': 'Adventure'}, {'name': 'Drama'}, {'n...","[Adventure, Drama, Science Fiction]"
2,The Dark Knight,"[{'name': 'Drama'}, {'name': 'Action'}, {'name...","[Drama, Action, Crime, Thriller]"


In [8]:
#Transformations:

#Droping columns that aren't useful for analysis

columns_to_drop = ['id', 'backdrop_path', 'poster_path', 'imdb_id', 'homepage']
tmdb_df.drop(columns=columns_to_drop, inplace=True, errors='ignore')

#Converting release-data to "datetime"

tmdb_df['release_date'] = pd.to_datetime(tmdb_df['release_date'], errors='coerce')

#Handling missing values

#Numeric fill
for col in ['runtime', 'revenue', 'budget']:
    tmdb_df[col] = tmdb_df[col].fillna(tmdb_df[col].median())

#Text fill
for col in ['overview', 'tagline']:
    tmdb_df[col] = tmdb_df[col].fillna('')

#Dropping rows missing essential info
tmdb_df = tmdb_df.dropna(subset=['title', 'release_date'])

#Creating simple genre sting column for optimized querying in PostgreSQL

tmdb_df['genre_str'] = tmdb_df['genres'].apply(lambda x: ', '.join([d['name'] for d in x]) if isinstance(x, list) else '')

In [9]:
#Pre-processing text features

for col in tmdb_df.columns:
    if tmdb_df[col].apply(lambda x: isinstance(x, (list, dict))).any():
        print(f"Column '{col}' contains list or dict values! Converting to string...")
        tmdb_df[col] = tmdb_df[col].apply(lambda x: str(x) if isinstance(x, (list, dict)) else x)

#Step 1: converting "keyowords" to string
def clean_keywords(kw_list):
    if pd.isnull(kw_list) or kw_list == '':
        return ''
    try:
        # Convert stringified list of dicts to real list
        keywords = ast.literal_eval(kw_list)
        if isinstance(keywords, list):
            return ' '.join([kw['name'] for kw in keywords if 'name' in kw])
    except:
        return ''
    return ''

tmdb_df['keywords_clean'] = tmdb_df['keywords'].apply(clean_keywords)

Column 'genres' contains list or dict values! Converting to string...
Column 'production_companies' contains list or dict values! Converting to string...
Column 'production_countries' contains list or dict values! Converting to string...
Column 'spoken_languages' contains list or dict values! Converting to string...
Column 'keywords' contains list or dict values! Converting to string...
Column 'genre_names' contains list or dict values! Converting to string...


In [10]:
#Step 2: normalizing to lowercase and filling missing text

tmdb_df['overview_clean'] = tmdb_df['overview'].fillna('').str.lower()
tmdb_df['genre_clean'] = tmdb_df['genre_str'].fillna('').str.lower()

In [11]:
#Step 3: combining all features into a single text column, called content_features

tmdb_df['content_features'] = (
    tmdb_df['overview_clean'] + ' ' +
    tmdb_df['genre_clean'] + ' ' +
    tmdb_df['keywords_clean']
)

Save cleaned data to Postgres and Mongo

In [12]:
#Creating PostgreSQL connection string and engine

#Connection details
db_user = 'postgres'
db_password = '123'  
db_host = 'localhost'
db_port = '5432'
db_name = 'movies_db'

#Creating engine
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

In [13]:
#Storing the cleaned dataframe tmdb_df into a PostgreSQL table called "movies"

#First, identifying and converting any problematic columns (lists or dicts) to strings

tmdb_df.to_sql('movies', engine, if_exists='replace', index=False)

print("Data written to PostgreSQL successfully!")

Data written to PostgreSQL successfully!


In [14]:
#Storing the rating data into a PostgreSQL table called "ratings"

ratings.to_sql('ratings', engine, if_exists='replace', index=False)

print("Data written to PostgreSQL successfully!")

Data written to PostgreSQL successfully!


In [15]:
#Loading data into MongoDB

#Connecting to MongoDB running locally via Docker
mongo_client = MongoClient("mongodb://localhost:27017/")

#Creating or connecting to the 'movies_db' database and 'movies_collection'
mongo_db = mongo_client["movies_db"]
mongo_collection = mongo_db["movies_collection"]

#Dropping the collection to avoid duplicates on re-run
mongo_collection.drop()

#Converting DataFrame to dictionary records
records = tmdb_df.to_dict(orient='records')

#Inserting records
mongo_collection.insert_many(records)

print("Data successfully inserted into MongoDB!")

Data successfully inserted into MongoDB!


## Recommendation System

In [16]:
conn = psycopg2.connect(
    host=db_host,
    port=db_port,
    dbname=db_name,
    user=db_user,
    password=db_password
)

In [17]:
cur = conn.cursor()

In [18]:
# Switch Mongo collection
mongo_collection = mongo_db["parquet"]

In [19]:
#SPARK: Starting session
spark = SparkSession.builder \
    .appName("TMDB_Movies_App") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.18,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/movies_db.movies") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/movies_db.movies") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

25/08/11 15:17:44 WARN Utils: Your hostname, admins-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.1.124 instead (on interface en1)
25/08/11 15:17:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Volumes/ExternalSSD/Users/externaladmin/.ivy2/cache
The jars for the packages stored in: /Volumes/ExternalSSD/Users/externaladmin/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c82b9d4e-a411-41f5-8297-4bf94c9051c7;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Volumes/ExternalSSD/opt/anaconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.postgresql#postgresql;42.2.18 in central
	found org.checkerframework#checker-qual;3.5.0 in central
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 90ms :: artifacts dl 3ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	org.postgresql#postgresql;42.2.18 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|d

In [20]:
query = """
SELECT
    *
FROM
	MOVIES
"""

tmdb_df = pd.read_sql_query(query, conn)

  tmdb_df = pd.read_sql_query(query, conn)


In [21]:
tmdb_df.head(5)

Unnamed: 0,title,vote_average,vote_count,status,release_date,revenue,runtime,adult,budget,original_language,...,production_countries,spoken_languages,keywords,genre_names,genre_str,keywords_str,keywords_clean,overview_clean,genre_clean,content_features
0,Inception,8.364,34495,Released,2010-07-15,825532764,148,False,160000000,en,...,"[{'name': 'United Kingdom'}, {'name': 'United ...","[{'name': 'English'}, {'name': 'French'}, {'na...","[{'name': 'rescue'}, {'name': 'mission'}, {'na...","['Action', 'Science Fiction', 'Adventure']","Action, Science Fiction, Adventure","rescue, mission, dream, airplane, paris, franc...",rescue mission dream airplane paris france vir...,"cobb, a skilled thief who commits corporate es...","action, science fiction, adventure","cobb, a skilled thief who commits corporate es..."
1,Interstellar,8.417,32571,Released,2014-11-05,701729206,169,False,165000000,en,...,"[{'name': 'United Kingdom'}, {'name': 'United ...",[{'name': 'English'}],"[{'name': 'rescue'}, {'name': 'future'}, {'nam...","['Adventure', 'Drama', 'Science Fiction']","Adventure, Drama, Science Fiction","rescue, future, spacecraft, race against time,...",rescue future spacecraft race against time art...,the adventures of a group of explorers who mak...,"adventure, drama, science fiction",the adventures of a group of explorers who mak...
2,The Dark Knight,8.512,30619,Released,2008-07-16,1004558444,152,False,185000000,en,...,"[{'name': 'United Kingdom'}, {'name': 'United ...","[{'name': 'English'}, {'name': 'Mandarin'}]","[{'name': 'joker'}, {'name': 'sadism'}, {'name...","['Drama', 'Action', 'Crime', 'Thriller']","Drama, Action, Crime, Thriller","joker, sadism, chaos, secret identity, crime f...",joker sadism chaos secret identity crime fight...,batman raises the stakes in his war on crime. ...,"drama, action, crime, thriller",batman raises the stakes in his war on crime. ...
3,Avatar,7.573,29815,Released,2009-12-15,2923706026,162,False,237000000,en,...,"[{'name': 'United States of America'}, {'name'...","[{'name': 'English'}, {'name': 'Spanish'}]","[{'name': 'future'}, {'name': 'society'}, {'na...","['Action', 'Adventure', 'Fantasy', 'Science Fi...","Action, Adventure, Fantasy, Science Fiction","future, society, culture clash, space travel, ...",future society culture clash space travel spac...,"in the 22nd century, a paraplegic marine is di...","action, adventure, fantasy, science fiction","in the 22nd century, a paraplegic marine is di..."
4,Pee-wee's Big Holiday,6.087,213,Released,2016-03-15,0,90,False,0,en,...,[{'name': 'United States of America'}],[{'name': 'English'}],[],"['Adventure', 'Comedy', 'Family']","Adventure, Comedy, Family",,,a fateful meeting with a mysterious stranger i...,"adventure, comedy, family",a fateful meeting with a mysterious stranger i...


In [22]:
#RECOMMENDATION ENGINE BUILD: Pre-processing data for PySpark ALS. 
#(Will use hybrid recommendation system, leveraing collaboritve and content-based filtering.)

#Loading ratings dataset:
ratings = spark.read.csv("ratings.csv", header=True, inferSchema=True)
"""
ratings = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/movies_db") \
    .option("dbtable", "public.ratings") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .load()
"""

                                                                                

'\nratings = spark.read     .format("jdbc")     .option("url", "jdbc:postgresql://localhost:5432/movies_db")     .option("dbtable", "public.ratings")     .option("user", "postgres")     .option("password", "123")     .option("driver", "org.postgresql.Driver")     .load()\n'

In [23]:
from pyspark.sql.functions import col

#Selecting and clean relevant columns
als_ratings = ratings.select("userId", "movieId", "rating") \
    .dropna() \
    .withColumn("userId", col("userId").cast("integer")) \
    .withColumn("movieId", col("movieId").cast("integer")) \
    .withColumn("rating", col("rating").cast("float"))

In [24]:
#Confirming schema is clean
als_ratings.printSchema()
als_ratings.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     17|   4.0|
|     1|     25|   1.0|
|     1|     29|   2.0|
|     1|     30|   5.0|
|     1|     32|   5.0|
+------+-------+------+
only showing top 5 rows



In [25]:
#Verifying ratings df has all necessary variables and is in Spark format

ratings.show(5)
ratings.printSchema()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|     17|   4.0|944249077|
|     1|     25|   1.0|944250228|
|     1|     29|   2.0|943230976|
|     1|     30|   5.0|944249077|
|     1|     32|   5.0|943228858|
+------+-------+------+---------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [26]:
#Splitting ratings data

(training, test) = ratings.randomSplit([0.8, 0.2], seed=42)

In [27]:
#Building and training ALS model

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,
    coldStartStrategy="drop"
)

param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20]) \
    .addGrid(als.regParam, [0.1, 0.01]) \
    .build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3
)

cv_model = cv.fit(training)
best_model = cv_model.bestModel

[Stage 4:>                                                        (0 + 10) / 10]

CodeCache: size=131072Kb used=23396Kb max_used=23807Kb free=107675Kb
 bounds [0x00000001026a0000, 0x0000000103e00000, 0x000000010a6a0000]
 total_blobs=9353 nmethods=8395 adapters=869
 compilation: disabled (not enough contiguous free space left)


                                                                                

In [28]:
#Evaluating the model on test data

predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.4f}")



Root-mean-square error = 0.8081


                                                                                

In [29]:
#Generating Top-N Recommendations

user_recs = best_model.recommendForAllUsers(10)
user_recs.show(5, truncate=False)



+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26    |[{151989, 6.518612}, {270306, 6.321332}, {240070, 6.321332}, {240054, 6.321332}, {194434, 6.2743864}, {275847, 6.2547817}, {210789, 6.1109123}, {177209, 6.0515385}, {225435, 6.0235205}, {183947, 5.9691477}] |
|27    |[{270306, 5.883775}, {240070, 5.883775}, {240054, 5.883775}, {177209, 5.8430715}, {151989, 5.7289357}, {210789, 5.605529}, {

                                                                                

In [30]:
#TF-IDF Vectorization of content_features

#Limiting to 2,000 features and ignoring common English stop words
tfidf = TfidfVectorizer(stop_words='english', max_features=2000)

In [31]:
#Fitting and transforming the content features
tfidf_matrix = tfidf.fit_transform(tmdb_df['content_features']) 
tmdb_df.head()

Unnamed: 0,title,vote_average,vote_count,status,release_date,revenue,runtime,adult,budget,original_language,...,production_countries,spoken_languages,keywords,genre_names,genre_str,keywords_str,keywords_clean,overview_clean,genre_clean,content_features
0,Inception,8.364,34495,Released,2010-07-15,825532764,148,False,160000000,en,...,"[{'name': 'United Kingdom'}, {'name': 'United ...","[{'name': 'English'}, {'name': 'French'}, {'na...","[{'name': 'rescue'}, {'name': 'mission'}, {'na...","['Action', 'Science Fiction', 'Adventure']","Action, Science Fiction, Adventure","rescue, mission, dream, airplane, paris, franc...",rescue mission dream airplane paris france vir...,"cobb, a skilled thief who commits corporate es...","action, science fiction, adventure","cobb, a skilled thief who commits corporate es..."
1,Interstellar,8.417,32571,Released,2014-11-05,701729206,169,False,165000000,en,...,"[{'name': 'United Kingdom'}, {'name': 'United ...",[{'name': 'English'}],"[{'name': 'rescue'}, {'name': 'future'}, {'nam...","['Adventure', 'Drama', 'Science Fiction']","Adventure, Drama, Science Fiction","rescue, future, spacecraft, race against time,...",rescue future spacecraft race against time art...,the adventures of a group of explorers who mak...,"adventure, drama, science fiction",the adventures of a group of explorers who mak...
2,The Dark Knight,8.512,30619,Released,2008-07-16,1004558444,152,False,185000000,en,...,"[{'name': 'United Kingdom'}, {'name': 'United ...","[{'name': 'English'}, {'name': 'Mandarin'}]","[{'name': 'joker'}, {'name': 'sadism'}, {'name...","['Drama', 'Action', 'Crime', 'Thriller']","Drama, Action, Crime, Thriller","joker, sadism, chaos, secret identity, crime f...",joker sadism chaos secret identity crime fight...,batman raises the stakes in his war on crime. ...,"drama, action, crime, thriller",batman raises the stakes in his war on crime. ...
3,Avatar,7.573,29815,Released,2009-12-15,2923706026,162,False,237000000,en,...,"[{'name': 'United States of America'}, {'name'...","[{'name': 'English'}, {'name': 'Spanish'}]","[{'name': 'future'}, {'name': 'society'}, {'na...","['Action', 'Adventure', 'Fantasy', 'Science Fi...","Action, Adventure, Fantasy, Science Fiction","future, society, culture clash, space travel, ...",future society culture clash space travel spac...,"in the 22nd century, a paraplegic marine is di...","action, adventure, fantasy, science fiction","in the 22nd century, a paraplegic marine is di..."
4,Pee-wee's Big Holiday,6.087,213,Released,2016-03-15,0,90,False,0,en,...,[{'name': 'United States of America'}],[{'name': 'English'}],[],"['Adventure', 'Comedy', 'Family']","Adventure, Comedy, Family",,,a fateful meeting with a mysterious stranger i...,"adventure, comedy, family",a fateful meeting with a mysterious stranger i...


In [32]:
#Checking shape
print("TF-IDF matrix shape:", tfidf_matrix.shape)

TF-IDF matrix shape: (510323, 2000)


In [33]:
#Sampling the first 500 movies from the TF-IDF matrix to prevent crashes when computing similarity

sample_size = 500  
tfidf_sample = tfidf_matrix[:sample_size]

In [34]:
#Reducing dimensionality to prevent memory crash
svd = TruncatedSVD(n_components=100, random_state=42)  
tfidf_reduced = svd.fit_transform(tfidf_sample)

In [35]:
#Performing FAISS-based (Facebook AI Similarity Search) Cosine Similarity with Batching

#Step 1: Converting TF-IDF sample to dense float32 array
tfidf_dense = tfidf_reduced.astype('float32')

In [36]:
#Step 2: Normalizing vectors to unit length (important for cosine similarity)
faiss.normalize_L2(tfidf_dense)

In [37]:
#Step 3: Building FAISS index for inner product (cosine similarity after normalization)
index = faiss.IndexFlatIP(tfidf_dense.shape[1])  
index.add(tfidf_dense)

In [38]:
print(tfidf_dense.shape)
print(index.is_trained)  #Should be True

(500, 100)
True


In [39]:
#Step 4: Batch similarity search with minimal memory
batch_size = 1
similar_movies = []

for start in range(0, tfidf_dense.shape[0], batch_size):
    end = min(start + batch_size, tfidf_dense.shape[0])
    tfidf_batch = tfidf_dense[start:end]

    try:
        similarity_scores, similar_indices = index.search(tfidf_batch, 6)  # 5 + self
        for i in range(end - start):
            for j in range(1, 6):  # Skip index 0 (self)
                similar_movies.append({
                    "movie_index": int(start + i),
                    "similar_index": int(similar_indices[i, j]),
                    "similarity": float(similarity_scores[i, j])
                })
        print(f"✅ Batch {start} to {end} processed successfully")

    except Exception as e:
        print(f"❌ Batch {start} to {end} failed: {e}")

✅ Batch 0 to 1 processed successfully
✅ Batch 1 to 2 processed successfully
✅ Batch 2 to 3 processed successfully
✅ Batch 3 to 4 processed successfully
✅ Batch 4 to 5 processed successfully
✅ Batch 5 to 6 processed successfully
✅ Batch 6 to 7 processed successfully
✅ Batch 7 to 8 processed successfully
✅ Batch 8 to 9 processed successfully
✅ Batch 9 to 10 processed successfully
✅ Batch 10 to 11 processed successfully
✅ Batch 11 to 12 processed successfully
✅ Batch 12 to 13 processed successfully
✅ Batch 13 to 14 processed successfully
✅ Batch 14 to 15 processed successfully
✅ Batch 15 to 16 processed successfully
✅ Batch 16 to 17 processed successfully
✅ Batch 17 to 18 processed successfully
✅ Batch 18 to 19 processed successfully
✅ Batch 19 to 20 processed successfully
✅ Batch 20 to 21 processed successfully
✅ Batch 21 to 22 processed successfully
✅ Batch 22 to 23 processed successfully
✅ Batch 23 to 24 processed successfully
✅ Batch 24 to 25 processed successfully
✅ Batch 25 to 26 pr

In [40]:
#Step 5: Converting to DataFrame for easier lookup
similar_movies_df = pd.DataFrame(similar_movies)
similar_movies_df.head()

Unnamed: 0,movie_index,similar_index,similarity
0,0,195,0.408314
1,0,457,0.395301
2,0,296,0.390028
3,0,166,0.377969
4,0,295,0.375597


In [41]:
#Movie recommendation function (content-based)

def recommend_by_title(movie_title, top_n=5):
    try:
        movie_index = tmdb_df[tmdb_df['original_title'] == movie_title].index[0]
    except IndexError:
        return f"Movie '{movie_title}' not found in dataset."

    similar_rows = similar_movies_df[similar_movies_df['movie_index'] == movie_index]
    
    #Sorting by similarity, exclude self-match
    top_matches = similar_rows[similar_rows['similar_index'] != movie_index]
    top_matches = top_matches.sort_values(by='similarity', ascending=False).head(top_n)

    #Mapping similar indices to movie titles
    top_matches['similar_title'] = top_matches['similar_index'].apply(lambda idx: tmdb_df.iloc[idx]['original_title'])
    
    return top_matches[['similar_title', 'similarity']]

In [42]:
#Testing that it works

recommend_by_title("Inception", top_n=5)

Unnamed: 0,similar_title,similarity
0,Blade Runner,0.408314
1,San Andreas,0.395301
2,The Big Lebowski,0.390028
3,Eternal Sunshine of the Spotless Mind,0.377969
4,Jurassic World: Fallen Kingdom,0.375597


In [43]:
#Precomputing and Storing Top-N Similar Movies for Each Movie (scalable)

#Number of recommendations to store
top_n = 10

precomputed_recs = []

for movie_index in tqdm(range(len(tmdb_df))):  # tqdm shows progress bar
    similar_rows = similar_movies_df[similar_movies_df['movie_index'] == movie_index]
    similar_rows = similar_rows[similar_rows['similar_index'] != movie_index]  # exclude self

    top_matches = similar_rows.sort_values(by='similarity', ascending=False).head(top_n)

    for _, row in top_matches.iterrows():
        precomputed_recs.append({
            "movie_index": movie_index,
            "similar_index": row['similar_index'],
            "similarity": row['similarity']
        })

#Converting to DataFrame
precomputed_df = pd.DataFrame(precomputed_recs)
precomputed_df.head()

100%|█████████████████████████████████| 510323/510323 [01:15<00:00, 6767.57it/s]


Unnamed: 0,movie_index,similar_index,similarity
0,0,195.0,0.408314
1,0,457.0,0.395301
2,0,296.0,0.390028
3,0,166.0,0.377969
4,0,295.0,0.375597


In [44]:
#Saving precomputed recommendations to a parquet file

precomputed_df.to_parquet('precomputed_recs.parquet', index=False)

In [45]:
#Testing the loading to make sure it works

loaded_df = pd.read_parquet('precomputed_recs.parquet')
loaded_df.head()

Unnamed: 0,movie_index,similar_index,similarity
0,0,195.0,0.408314
1,0,457.0,0.395301
2,0,296.0,0.390028
3,0,166.0,0.377969
4,0,295.0,0.375597


In [46]:
mongo_collection.insert_many(loaded_df.to_dict(orient='records'))
print("Data successfully saved from Parquet to MongoDB.")

Data successfully saved from Parquet to MongoDB.


In [47]:
#Closing sessions

#SparkSession 
try:
    spark.stop()
    print("Spark session stopped.")
except NameError:
    print("No Spark session was running.")

#PostgreSQL connection 
try:
    if 'conn' in globals():
        conn.close()
        print("PostgreSQL connection closed.")
except Exception as e:
    print("Error closing PostgreSQL connection:", e)

#Reminder for Flask
print("\n If Flask is still running in the terminal, press CTRL+C to stop the server.")
print("Make sure any changes to your code files are saved before closing JupyterLab.")

Spark session stopped.
PostgreSQL connection closed.

 If Flask is still running in the terminal, press CTRL+C to stop the server.
Make sure any changes to your code files are saved before closing JupyterLab.


In [48]:
print(datetime.now())

2025-08-11 15:27:17.623624
