In [22]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import StandardScaler
import re
from typing import List, Optional
class MovieRecommendationSystem:
    def __init__(self):
        self.movies = None
        self.tfidf_matrix = None
        self.cosine_sim = None
        self.tfidf_vectorizer = None
        self.movie_to_idx = {}
        self.idx_to_movie = {}
    def load_data(self, movies_path: str, ratings_path: str, tags_path: str, links_path: str):
        try:
            self.movies = pd.read_csv(movies_path)
            self.ratings = pd.read_csv(ratings_path)
            self.tags = pd.read_csv(tags_path)
            self.links = pd.read_csv(links_path)
            print("Data loaded successfully.\n")
            print("Statistics:")
            self._validate_data()
        except Exception as e:
            print(f"Error loading data: {e}")
            return False
        return True   
    def _validate_data(self):
        print(f"Movies: {len(self.movies)} records.")
        print(f"Ratings: {len(self.ratings)} records.")
        print(f"Tags: {len(self.tags)} records.")
        print(f"Links: {len(self.links)} records.")
        print("\nMissing values in the data:")
        for df_name, df in [("Movies", self.movies), ("Ratings", self.ratings), 
                           ("Tags", self.tags), ("Links", self.links)]:
            missing = df.isnull().sum().sum()
            if missing > 0:
                print(f"  {df_name}: {missing}")
    def _clean_text(self, text: str) -> str:
        if pd.isna(text):
            return ""
        text = re.sub(r'[^\w\s]', ' ', str(text))
        text = re.sub(r'\s+', ' ', text)
        return text.lower().strip()
    def preprocess_data(self, top_n_movies: int = 10000):
        popular_movies = self.ratings['movieId'].value_counts().head(top_n_movies).index
        self.movies = self.movies[self.movies['movieId'].isin(popular_movies)].copy()
        if not self.tags.empty:
            self.tags['tag'] = self.tags['tag'].apply(self._clean_text)
            movie_tags = self.tags.groupby('movieId')['tag'].apply(
                lambda x: ' '.join(x.dropna().astype(str))
            ).reset_index()
            self.movies = self.movies.merge(movie_tags, on='movieId', how='left')
        self.movies['tag'] = self.movies.get('tag', '').fillna('')
        self.movies['genres_clean'] = self.movies['genres'].str.replace('|', ' ', regex=False)
        rating_stats = self.ratings.groupby('movieId').agg({
            'rating': ['mean', 'count', 'std']
        }).round(2)
        rating_stats.columns = ['avg_rating', 'rating_count', 'rating_std']
        rating_stats = rating_stats.reset_index()
        self.movies = self.movies.merge(rating_stats, on='movieId', how='left')
        self.movies['rating_std'] = self.movies['rating_std'].fillna(0)
        self.movies['metadata'] = (
            self.movies['genres_clean'] + ' ' + 
            self.movies['tag'] + ' ' +
            self.movies.apply(lambda x: f"highly_rated" if x['avg_rating'] > 4.0 else "", axis=1) + ' ' +
            self.movies.apply(lambda x: f"popular" if x['rating_count'] > 100 else "", axis=1)
        )
        self.movies['metadata'] = self.movies['metadata'].apply(self._clean_text)
        self.movie_to_idx = {title: idx for idx, title in enumerate(self.movies['title'])}
        self.idx_to_movie = {idx: title for title, idx in self.movie_to_idx.items()}
        print(f"\nPreprocessed {len(self.movies)} movies, successfully.\n")
    def build_similarity_matrix(self, use_sparse: bool = True):
        self.tfidf_vectorizer = TfidfVectorizer(
            stop_words='english',
            max_features=5000, 
            ngram_range=(1, 2),  
            min_df=2, 
            max_df=0.8 
        )       
        self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.movies['metadata'])
        print(f"Computing similarity matrix for the {self.tfidf_matrix.shape[0]} movies...")
        if use_sparse and self.tfidf_matrix.shape[0] > 5000:
            print("\nUsing sparse computation for memory efficiency...")
            self.cosine_sim = None 
        else:
            self.cosine_sim = cosine_similarity(self.tfidf_matrix)
        print(f"Similarity matrix built: {self.tfidf_matrix.shape}")
    def _compute_similarity_on_demand(self, idx: int, top_n: int = 50):
        if self.cosine_sim is not None:
            return self.cosine_sim[idx]
        movie_tfidf = self.tfidf_matrix[idx:idx+1]
        similarities = cosine_similarity(movie_tfidf, self.tfidf_matrix).flatten()
        return similarities
    def _find_closest_title(self, search_title: str) -> Optional[str]:
        from difflib import get_close_matches
        if search_title in self.movie_to_idx:
            return search_title
        matches = get_close_matches(search_title, self.movie_to_idx.keys(), n=1, cutoff=0.6)
        if matches:
            return matches[0]
        partial_matches = [title for title in self.movie_to_idx.keys() 
                          if search_title.lower() in title.lower()]
        if partial_matches:
            return partial_matches[0]
        return None
    def get_recommendations(self, title: str, top_n: int = 10, 
                          include_metadata: bool = False) -> pd.DataFrame:
        matched_title = self._find_closest_title(title)
        if not matched_title:
            suggestions = [t for t in self.movie_to_idx.keys() if any(word in t.lower() for word in title.lower().split())][:5]
            if suggestions:
                suggestion_text = f"Did you mean: {', '.join(suggestions)}?"
            else:
                suggestion_text = "Try a different movie title."
            return f"Movie '{title}' not found. {suggestion_text}"
        if matched_title != title:
            print(f"Using closest match: '{matched_title}' for '{title}'")
            title = matched_title
        idx = self.movie_to_idx[title]
        similarities = self._compute_similarity_on_demand(idx, top_n + 20)
        sim_scores = list(enumerate(similarities))
        sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
        sim_scores = sim_scores[1:top_n+1]
        movie_indices = [i[0] for i in sim_scores]
        similarity_scores = [i[1] for i in sim_scores]
        recommendations = self.movies.iloc[movie_indices].copy()
        recommendations['similarity_score'] = similarity_scores
        columns = ['title', 'genres', 'avg_rating', 'rating_count', 'similarity_score']
        if include_metadata:
            columns.append('metadata')
        return recommendations[columns].round(3)
    def get_popular_movies(self, genre: Optional[str] = None, top_n: int = 10) -> pd.DataFrame:
        df = self.movies.copy()
        if genre:
            df = df[df['genres'].str.contains(genre, case=False, na=False)]
        df = df.sort_values(['rating_count', 'avg_rating'], ascending=[False, False])
        return df[['title', 'genres', 'avg_rating', 'rating_count']].head(top_n)
    def save_model(self, filepath: str):
        import pickle
        model_data = {
            'movies': self.movies,
            'tfidf_matrix': self.tfidf_matrix,
            'tfidf_vectorizer': self.tfidf_vectorizer,
            'movie_to_idx': self.movie_to_idx,
            'idx_to_movie': self.idx_to_movie
        }
        with open(filepath, 'wb') as f:
            pickle.dump(model_data, f)
        print(f"Model saved to {filepath}")
    def load_model(self, filepath: str):
        import pickle
        with open(filepath, 'rb') as f:
            model_data = pickle.load(f)
        self.movies = model_data['movies']
        self.tfidf_matrix = model_data['tfidf_matrix']
        self.tfidf_vectorizer = model_data['tfidf_vectorizer']
        self.movie_to_idx = model_data['movie_to_idx']
        self.idx_to_movie = model_data['idx_to_movie']
        self.cosine_sim = None 
        print(f"Model loaded from {filepath}")
    def get_system_stats(self):
        if self.movies is None:
            return "No data loaded."
        stats = {
            'total_movies': len(self.movies),
            'avg_rating_range': f"{self.movies['avg_rating'].min():.1f} - {self.movies['avg_rating'].max():.1f}",
            'most_rated_movie': self.movies.loc[self.movies['rating_count'].idxmax(), 'title'],
            'highest_rated_movie': self.movies.loc[self.movies['avg_rating'].idxmax(), 'title'],
            'genres_covered': len(set('|'.join(self.movies['genres']).split('|'))),
            'memory_usage_mb': self.tfidf_matrix.data.nbytes / 1024 / 1024 if self.tfidf_matrix else 0
        }
        return stats
        recommendations = self.get_recommendations(title, top_n, include_metadata=True)
        if isinstance(recommendations, str):
            return recommendations
        print(f"\nAnalysis for '{title}' recommendations:")
        print("=" * 50)
        input_idx = self.movie_to_idx[title]
        input_metadata = self.movies.iloc[input_idx]['metadata']
        input_genres = self.movies.iloc[input_idx]['genres']   
        print(f"Input movie genres: {input_genres}")
        print(f"Input movie metadata: {input_metadata[:100]}...")
        print("\nRecommendations:")
        for i, row in recommendations.iterrows():
            print(f"\n{row['title']} (Similarity: {row['similarity_score']:.3f})")
            print(f"  Genres: {row['genres']}")
            print(f"  Rating: {row['avg_rating']:.1f} ({int(row['rating_count'])} ratings)")
def main():
    rec_system = MovieRecommendationSystem()
    if not rec_system.load_data("movies.csv", "ratings.csv", "tags.csv", "links.csv"):
        return
    rec_system.preprocess_data(top_n_movies=10000)
    rec_system.build_similarity_matrix()
    print("\n")
    print("--------")
    print("\n")
    print("Movie Recommendations:\n")
    test_movies = ["Toy Story (1995)", "Jurassic Park (1993)", "The Matrix (1999)"]
    for movie in test_movies:
        print(f"\nRecommendations for '{movie}':")
        recommendations = rec_system.get_recommendations(movie, top_n=5)
        if isinstance(recommendations, pd.DataFrame):
            print(recommendations.to_string(index=False))
        else:
            print(recommendations)
    print(f"\n\nPopular Action Movies:")
    popular_action = rec_system.get_popular_movies("Action", top_n=5)
    print(popular_action.to_string(index=False))
if __name__ == "__main__":
    main()

Data loaded successfully.

Statistics:
Movies: 87585 records.
Ratings: 32000204 records.
Tags: 2000072 records.
Links: 87585 records.

Missing values in the data:
  Tags: 17
  Links: 124

Preprocessed 10000 movies, successfully.

Computing similarity matrix for the 10000 movies...

Using sparse computation for memory efficiency...
Similarity matrix built: (10000, 5000)


--------


Movie Recommendations:


Recommendations for 'Toy Story (1995)':
               title                                           genres  avg_rating  rating_count  similarity_score
  Toy Story 2 (1999)      Adventure|Animation|Children|Comedy|Fantasy        3.81         32683             0.901
Bug's Life, A (1998)              Adventure|Animation|Children|Comedy        3.56         26736             0.789
  Toy Story 3 (2010) Adventure|Animation|Children|Comedy|Fantasy|IMAX        3.83         20327             0.703
 Finding Dory (2016)                       Adventure|Animation|Comedy        3.54          562