<a href="https://colab.research.google.com/github/yoh1234/movie_recommender/blob/main/26m_movie_recommender.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install surprise

In [None]:
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from ast import literal_eval
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import linear_kernel, cosine_similarity
from nltk.stem.snowball import SnowballStemmer
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import wordnet
from surprise import Reader, Dataset, SVD

import warnings; warnings.simplefilter('ignore')

In [None]:
# !apt-get update # Update apt-get repository.
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
# !wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
# !tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
# !pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# # Set environment variables
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# !ls

# # Initialize findspark
# import findspark
# findspark.init()

# # Create a PySpark session
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# spark

In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
df_movies = pd.read_csv('/content/drive/My Drive/26m_movie/movies_metadata.csv')
df_movies.head()

In [None]:
df_movies = df_movies[df_movies["adult"] == "False"]
print(len(df_movies))

In [None]:
is_only_lists_col1 = df_movies['genres'].apply(lambda x: isinstance(x, list)).all()
print(f"Does 'col1' contain only lists? {is_only_lists_col1}")

In [None]:
# Check if genres column contains list type data
df_check_genres_datatype = df_movies['genres'].apply(lambda x: isinstance(literal_eval(x), list))
df_check_genres_datatype = pd.DataFrame(df_check_genres_datatype)
count = len(df_check_genres_datatype[df_check_genres_datatype["genres"] == True])
print(count)


In [None]:
df_movies['genres'] = df_movies['genres'].apply(literal_eval).apply(lambda x:
 [i['name'] for i in x])

In [None]:
def extract_year(x):
  return x.split('-')[0]


In [None]:
df_movies['release_date'] = df_movies['release_date'].astype(str)
df_movies['year'] = df_movies['release_date'].apply(extract_year)
df_movies


In [None]:
# print(df_movies['original_language'].nunique())

In [None]:
# # Count the number of movies based on languages
# language_count = df_movies.groupby('original_language')['id'].count().reset_index(name='count')
# language_count = language_count.sort_values('count', ascending=False)
# print(language_count.head(20))

In [None]:
# df_movies = df_movies[df_movies['original_language'] == 'en']
# print(len(df_movies))

In [None]:
# Calculate weighted rating from IMDB's weighted rating formula
# v/(v+m)*R + m/(v+m)*C
df_movies['vote_count'].isnull().sum()

In [None]:
df_movies = df_movies.dropna(subset=['vote_count'])
print(len(df_movies))

In [None]:
df_movies['vote_average'].isnull().sum()

In [None]:
def check_subset(list1, list2):
  if set(list1).issubset(set(list2)):
    return True
  else:
    return False

In [None]:
def weighted_rating(x, m, C):
    v = x['vote_count']
    R = x['vote_average']
    return (v/(v+m) * R) + (m/(m+v) * C)

In [None]:
def get_qualified_dataset(df_genre, genre = None, percentile = 0.95, use_all_genre = False):
  if use_all_genre:
    df_genre = df_genre
  else:
    df_genre['genre_selected'] = df_genre['genres'].apply(lambda x: check_subset(genre, x))
    df_genre = df_genre[df_genre['genre_selected']]
  C = df_genre['vote_average'].mean()
  m = df_genre['vote_count'].quantile(percentile)
  print(m)

  df_genre_qualified = df_genre[df_genre['vote_count'] >= m][['title', 'year', 'vote_count', 'vote_average', 'popularity', 'id','genres']]
  df_genre_qualified['wr'] = df_genre_qualified.apply(lambda x: weighted_rating(x, m, C), axis=1)
  df_genre_qualified = df_genre_qualified.sort_values('wr', ascending=False)
  return df_genre_qualified

In [None]:
get_qualified_dataset(df_movies, ['Animation']).head(20)

In [None]:
# Get qualified movie dataset
df_movies_qualified = get_qualified_dataset(df_movies, use_all_genre = True)
print(len(df_movies_qualified))

In [None]:
df_movies_qualified.head(20)

In [None]:
# Use qualified data only
df_movies = df_movies_qualified

Content Based Recommender

In [None]:
df_link = pd.read_csv('/content/drive/My Drive/26m_movie/links.csv')
df_link.head()

In [None]:
df_link = df_link[df_link['tmdbId'].notnull()]

In [None]:
df_link['tmdbId'] = df_link['tmdbId'].astype('int')

In [None]:
df_link.info()

In [None]:
df_link_unique = df_link.drop_duplicates(subset=['movieId'])

In [None]:
df_link['check_missing_values'] = df_link['tmdbId'].apply(lambda x: True if str(x) in df_movies['id'].values else False)

In [None]:
df_link.info()

In [None]:
df_link = df_link[df_link['check_missing_values']]

In [None]:
print(len(df_link))
print(len(df_movies))

In [None]:
df_link.head()

In [None]:
df_movie = df

Movie Description Based Recommender

In [None]:
df_movie['tagline'] = df_movie['tagline'].fillna('')
df_movie['description'] = df_movie['overview'] + df_movie['tagline']
df_movie['description'] = df_movie['description'].fillna('')

In [None]:
# tf = TfidfVectorizer(analyzer='word',ngram_range=(1, 2),min_df=1, stop_words='english', max_features=100)
# tfidf_matrix = tf.fit_transform(df_movie['description'])

In [None]:
# tfidf_matrix.shape

In [None]:
# cosine_sim = linear_kernel(tfidf_matrix, tfidf_matrix)
# cosine_sim

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local[*]").appName("CosineSimilarityLargeData").getOrCreate()

In [None]:
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType
import numpy as np
from pyspark.sql import Row

# Define the schema explicitly for the 'vector' column as an array of floats
schema = StructType([
    StructField("id", FloatType(), False),
    StructField("features", ArrayType(FloatType()), False)  # The vector is an array of floats
])

# Generate large random data (for example purposes)
num_rows = 100  # Simulate 10,000 vectors (adjust this based on your memory constraints)
vector_size = 100  # Each vector will have 100 features

# Create random vectors as lists (not numpy arrays)
data = []
for i in range(num_rows):
    vector = np.random.rand(vector_size).tolist()  # Convert numpy array to list
    data.append(Row(id=float(i), vector=vector))

# Create a DataFrame using the schema
df = spark.createDataFrame(data, schema)

# Show the first few rows
df.show(5)

In [None]:
df_partitioned = df.repartition(100)

In [None]:
df_partitioned.show(5)

In [None]:
df.show(2)

In [None]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="features", outputCol="tf")
tf = hashingTF.transform(df)

idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
tfidf = idf.transform(tf)

In [None]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="feature", outputCol="norm")
data = normalizer.transform(tfidf)

In [None]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
mat = IndexedRowMatrix(
    data.select("ID", "norm")\
        .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()

Collaborative Filtering

In [None]:
import pandas as pd
df_ratings = pd.read_csv('/content/drive/My Drive/26m_movie/ratings.csv')
df_ratings.head()

In [None]:
df_movies.head()

In [None]:
df_movies['id'] = df_movies['id'].astype(int)

In [None]:
print(len(df_ratings))

In [None]:
print(len(df_movies))

In [None]:
df_movies_merged = pd.merge(df_movies, df_link[['movieId', 'tmdbId']], left_on = 'id', right_on = 'tmdbId')

In [None]:
df_movies_merged.head(2)

In [None]:
df_movies_id_set = set(df_movies_merged['movieId'])

In [None]:
print(len(df_movies_id_set))

In [None]:
df_ratings['is_in_qualified_movies'] = df_ratings['movieId'].apply(lambda id: id in df_movies_id_set)

In [None]:
df_ratings.head()

In [None]:
print(df_ratings['is_in_qualified_movies'].value_counts())

In [None]:
df_ratings_qualified = df_ratings[df_ratings['is_in_qualified_movies']]
df_ratings_qualified.head()

In [None]:
id_count = df_ratings_qualified.groupby('userId').size().sort_values(ascending = False)
id_count = id_count.reset_index(name='count')
print(id_count)

In [None]:
df_ratings_qualified = pd.merge(df_ratings_qualified, id_count, on = 'userId', how = 'left')

In [None]:
df_ratings_qualified

In [None]:
df_ratings_qualified = df_ratings_qualified[df_ratings_qualified['count'] >= 10]

In [None]:
print(len(df_ratings_qualified))

In [None]:
df_ratings_qualified = df_ratings_qualified[['userId', 'movieId', 'rating']]
df_ratings_qualified.head()

In [None]:
reader = Reader()

In [None]:
data = Dataset.load_from_df(df_ratings_qualified, reader)

In [None]:
from surprise.model_selection import train_test_split
from surprise import accuracy

In [None]:
trainset, testset = train_test_split(data, test_size=0.25)

In [56]:
model = SVD()

# Step 3: Train the model on the training set
model.fit(trainset)

<surprise.prediction_algorithms.matrix_factorization.SVD at 0x78a7847130d0>

In [57]:
predictions = model.test(testset)

In [58]:
rmse = accuracy.rmse(predictions)
print(f"RMSE on the Test Set: {rmse:.4f}")

RMSE: 0.7855
RMSE on the Test Set: 0.7855


In [60]:
import pickle
with open('svd_first_model.pkl', 'wb') as f:
    pickle.dump(model, f)

print("Model saved successfully to 'svd_model.pkl'")

Model saved successfully to 'svd_model.pkl'
