In [None]:
"""
Memory-Efficient Movie Recommender using GroupLens BigQuery Dataset

This implementation connects to BigQuery to load data from the
film-wizard-453315 project's GroupLens tables, processes it efficiently
without using pivot tables, and provides personalized movie recommendations.
"""

import pandas as pd
import numpy as np
from collections import defaultdict
from google.cloud import bigquery
from google.colab import auth
import os
from tqdm.notebook import tqdm

In [None]:
class GroupLensRecommender:
    """A memory-efficient movie recommendation system using GroupLens data."""
    
    def __init__(self):
        """Initialize the recommender system."""
        self.user_ratings = {}  # Dictionary: {user_id: {movie_id: rating}}
        self.movie_ratings = {}  # Dictionary: {movie_id: {user_id: rating}}
        self.movies_info = {}    # Dictionary: {movie_id: {title, genres}}
        self.similarity_cache = {}  # Cache for computed user similarities
        self.client = None
    
    def connect_to_bigquery(self, project_id="film-wizard-453315"):
        """
        Connect to BigQuery and authenticate.
        
        Parameters:
            project_id (str): The Google Cloud project ID
            
        Returns:
            bool: True if connection was successful
        """
        try:
            # Authenticate using Colab's auth helper
            auth.authenticate_user()
            
            # Create BigQuery client
            self.client = bigquery.Client(project=project_id)
            print(f"Connected to BigQuery project: {project_id}")
            return True
            
        except Exception as e:
            print(f"Error connecting to BigQuery: {e}")
            return False
    
    def load_data_from_bigquery(self, max_ratings=100_000_000, batch_size=50000):
        """
        Load movie and ratings data from GroupLens BigQuery tables.
        
        Parameters:
            max_ratings (int): Maximum number of ratings to load (for memory control)
            batch_size (int): Size of each batch when processing ratings
            
        Returns:
            bool: True if loading was successful
        """
        if not self.client:
            print("Not connected to BigQuery. Please call connect_to_bigquery() first.")
            return False
            
        try:
            # Load movies data
            movies_query = """
            SELECT movieId, title, genres
            FROM `film-wizard-453315.Grouplens.grouplens_movies`
            """
            
            print("Loading movies data...")
            movies_df = self.client.query(movies_query).result().to_dataframe()
            
            # Store movies info in dictionary
            for _, row in movies_df.iterrows():
                self.movies_info[row['movieId']] = {
                    'title': row['title'],
                    'genres': row['genres']
                }
            
            # Load ratings data with limiting to control memory usage
            ratings_query = f"""
            SELECT userId, movieId, rating
            FROM `film-wizard-453315.Grouplens.raw_grouplens_ratings`
            LIMIT {max_ratings}
            """
            
            # Process ratings in batches to avoid memory issues
            print(f"Loading ratings data (limited to {max_ratings} entries)...")
            
            query_job = self.client.query(ratings_query)
            
            # Process ratings in batches
            processed = 0
            
            for batch in self._batch_query_results(query_job, batch_size):
                for row in batch:
                    user_id = row['userId']
                    movie_id = row['movieId']
                    rating = float(row['rating'])  # Ensure rating is a float
                    
                    # Add to user_ratings
                    if user_id not in self.user_ratings:
                        self.user_ratings[user_id] = {}
                    self.user_ratings[user_id][movie_id] = rating
                    
                    # Add to movie_ratings
                    if movie_id not in self.movie_ratings:
                        self.movie_ratings[movie_id] = {}
                    self.movie_ratings[movie_id][user_id] = rating
                
                processed += len(batch)
                print(f"Processed {processed} ratings...")
            
            num_users = len(self.user_ratings)
            num_movies = len(self.movies_info)
            num_ratings = sum(len(ratings) for ratings in self.user_ratings.values())
            
            print(f"Loaded {num_movies} movies and {num_ratings} ratings for {num_users} users")
            
            # Calculate estimated memory savings
            full_matrix_bytes = num_users * num_movies * 8  # 8 bytes per float64
            sparse_bytes = num_ratings * 16  # ~16 bytes per rating in sparse format
            
            if full_matrix_bytes > 0:
                reduction_pct = (1 - sparse_bytes / full_matrix_bytes) * 100
                print(f"Memory usage reduced by approximately {reduction_pct:.2f}%")
                print(f"Full matrix would use {full_matrix_bytes/1024/1024:.2f} MB vs {sparse_bytes/1024/1024:.2f} MB in sparse format")
            
            return True
            
        except Exception as e:
            print(f"Error loading data from BigQuery: {e}")
            return False
  
    def _batch_query_results(self, query_job, batch_size):
        """Get query results in batches to avoid memory issues."""
        iterator = query_job.result(page_size=batch_size)
        batch = []
        
        for row in iterator:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        
        if batch:
            yield batch
    
    def calculate_similarity(self, user_a, user_b, method='pearson'):
        """
        Calculate similarity between two users.
        
        Parameters:
            user_a (int): First user ID
            user_b (int): Second user ID
            method (str): 'pearson' or 'cosine'
        
        Returns:
            float: Similarity score
        """
        # Use cached value if available
        cache_key = f"{min(user_a, user_b)}_{max(user_a, user_b)}_{method}"
        if cache_key in self.similarity_cache:
            return self.similarity_cache[cache_key]
        
        # Get ratings for both users
        ratings_a = self.user_ratings.get(user_a, {})
        ratings_b = self.user_ratings.get(user_b, {})
        
        if not ratings_a or not ratings_b:
            return 0
        
        # Find common movies
        common_movies = set(ratings_a.keys()) & set(ratings_b.keys())
        
        if len(common_movies) < 3:  # Require at least 3 common movies
            return 0
        
        if method == 'pearson':
            # Calculate Pearson correlation
            
            # Calculate means
            mean_a = sum(ratings_a[movie_id] for movie_id in common_movies) / len(common_movies)
            mean_b = sum(ratings_b[movie_id] for movie_id in common_movies) / len(common_movies)
            
            # Calculate correlation components
            numerator = sum((ratings_a[movie_id] - mean_a) * (ratings_b[movie_id] - mean_b) 
                           for movie_id in common_movies)
            
            sum_sq_a = sum((ratings_a[movie_id] - mean_a) ** 2 for movie_id in common_movies)
            sum_sq_b = sum((ratings_b[movie_id] - mean_b) ** 2 for movie_id in common_movies)
            
            # Calculate correlation
            denominator = np.sqrt(sum_sq_a * sum_sq_b)
            
            if denominator == 0:
                similarity = 0
            else:
                similarity = numerator / denominator
            
        elif method == 'cosine':
            # Calculate cosine similarity
            dot_product = sum(ratings_a[movie_id] * ratings_b[movie_id] for movie_id in common_movies)
            
            magnitude_a = np.sqrt(sum(ratings_a[movie_id] ** 2 for movie_id in common_movies))
            magnitude_b = np.sqrt(sum(ratings_b[movie_id] ** 2 for movie_id in common_movies))
            
            if magnitude_a == 0 or magnitude_b == 0:
                similarity = 0
            else:
                similarity = dot_product / (magnitude_a * magnitude_b)
        
        else:
            raise ValueError(f"Unknown similarity method: {method}")
        
        # Cache the result
        self.similarity_cache[cache_key] = similarity
        
        return similarity
    
    def find_similar_users(self, target_user_id, n=10, method='pearson'):
        """
        Find users similar to the target user.
        
        Parameters:
            target_user_id (int): The target user ID
            n (int): Number of similar users to return
            method (str): Similarity method ('pearson' or 'cosine')
        
        Returns:
            list: List of tuples (user_id, similarity_score)
        """
        if target_user_id not in self.user_ratings:
            print(f"User {target_user_id} not found!")
            return []
        
        similarities = []
        
        # Calculate similarity with other users - use a subset for large datasets
        user_ids = list(self.user_ratings.keys())
        
        # If we have a large number of users, limit to a random sample for efficiency
        if len(user_ids) > 1000:
            # Keep the target user and sample random users
            sampled_users = [target_user_id] + np.random.choice(
                [u for u in user_ids if u != target_user_id], 
                size=min(999, len(user_ids)-1), 
                replace=False
            ).tolist()
            user_ids = sampled_users
        
        # Calculate similarities
        for user_id in tqdm(user_ids, desc="Finding similar users", disable=len(user_ids)<100):
            if user_id == target_user_id:
                continue
            
            similarity = self.calculate_similarity(target_user_id, user_id, method=method)
            if similarity > 0:  # Only consider positively correlated users
                similarities.append((user_id, similarity))
        
        # Sort by similarity (descending) and return top n
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:n]
    
    def recommend_movies(self, target_user_id, n=10, method='pearson'):
        """
        Recommend movies for the target user.
        
        Parameters:
            target_user_id (int): The target user ID
            n (int): Number of recommendations to return
            method (str): Similarity method ('pearson' or 'cosine')
        
        Returns:
            DataFrame: DataFrame with movie recommendations
        """
        if target_user_id not in self.user_ratings:
            print(f"User {target_user_id} not found!")
            return pd.DataFrame(columns=['movieId', 'title', 'genres', 'score'])
        
        print(f"Finding movies to recommend for user {target_user_id}...")
        
        # Get similar users
        similar_users = self.find_similar_users(target_user_id, n=50, method=method)  # Get more similar users for better recommendations
        
        if not similar_users:
            print("No similar users found!")
            return pd.DataFrame(columns=['movieId', 'title', 'genres', 'score'])
        
        print(f"Found {len(similar_users)} similar users")
        
        # Get movies the target user has already rated
        rated_movies = set(self.user_ratings[target_user_id].keys())
        
        # Calculate weighted scores for candidate movies
        candidate_scores = defaultdict(float)
        score_counts = defaultdict(int)  # Count how many users contributed to each score
        
        for user_id, similarity in similar_users:
            # For each similar user, add their ratings (weighted by similarity)
            for movie_id, rating in self.user_ratings[user_id].items():
                # Skip movies the target user has already rated
                if movie_id in rated_movies:
                    continue
                    
                # Only consider positively rated movies (rating > 3.5)
                if rating > 3.5:
                    # Weight by both similarity and rating magnitude
                    candidate_scores[movie_id] += similarity * (rating - 3)  # Subtract threshold for better differentiation
                    score_counts[movie_id] += 1
        
        if not candidate_scores:
            print("No recommendations found!")
            return pd.DataFrame(columns=['movieId', 'title', 'genres', 'score'])
        
        # Convert to DataFrame with normalized scores
        recommendations = []
        for movie_id, score in candidate_scores.items():
            # Only include movies that have been recommended by at least 2 similar users
            if score_counts[movie_id] >= 2:
                movie_info = self.movies_info.get(movie_id, {})
                
                # Create recommendation entry
                recommendations.append({
                    'movieId': movie_id,
                    'title': movie_info.get('title', f"Movie {movie_id}"),
                    'genres': movie_info.get('genres', ''),
                    'score': score,
                    'count': score_counts[movie_id]
                })
        
        # Create DataFrame and sort by score
        recommendations_df = pd.DataFrame(recommendations)
        if not recommendations_df.empty:
            recommendations_df = recommendations_df.sort_values('score', ascending=False).head(n)
            # Normalize scores to a 0-5 scale for easier interpretation
            max_score = recommendations_df['score'].max() if len(recommendations_df) > 0 else 1
            recommendations_df['normalized_score'] = recommendations_df['score'].apply(
                lambda x: min(5, max(0, (x / max_score) * 5)) if max_score > 0 else 0
            )
        
        return recommendations_df
    
    def get_user_ratings(self, user_id):
        """
        Get the movies a user has rated.
        
        Parameters:
            user_id (int): The user ID
        
        Returns:
            DataFrame: DataFrame with the user's ratings
        """
        if user_id not in self.user_ratings:
            print(f"User {user_id} not found!")
            return pd.DataFrame(columns=['movieId', 'title', 'genres', 'rating'])
        
        # Create list of dictionaries for user ratings
        user_ratings_list = []
        for movie_id, rating in self.user_ratings[user_id].items():
            movie_info = self.movies_info.get(movie_id, {})
            user_ratings_list.append({
                'movieId': movie_id,
                'title': movie_info.get('title', f"Movie {movie_id}"),
                'genres': movie_info.get('genres', ''),
                'rating': rating
            })
        
        # Create DataFrame and sort by rating
        user_ratings_df = pd.DataFrame(user_ratings_list)
        if not user_ratings_df.empty:
            user_ratings_df = user_ratings_df.sort_values('rating', ascending=False)
        
        return user_ratings_df
    
    def get_genre_recommendations(self, target_user_id, genre, n=10):
        """
        Get recommendations for a specific genre.
        
        Parameters:
            target_user_id (int): The target user ID
            genre (str): Genre to filter by
            n (int): Number of recommendations to return
            
        Returns:
            DataFrame: DataFrame with genre-specific recommendations
        """
        # Get general recommendations first
        recommendations = self.recommend_movies(target_user_id, n=100)  # Get more to filter
        
        if recommendations.empty:
            return recommendations
        
        # Filter by genre
        genre_recommendations = recommendations[
            recommendations['genres'].str.contains(genre, case=False, na=False)
        ].head(n)
        
        return genre_recommendations
    
    def analyze_user_tastes(self, user_id):
        """
        Analyze a user's genre preferences and rating patterns.
        
        Parameters:
            user_id (int): The user ID
            
        Returns:
            dict: Dictionary with analysis results
        """
        if user_id not in self.user_ratings:
            print(f"User {user_id} not found!")
            return {}
        
        # Get user's ratings
        user_ratings_df = self.get_user_ratings(user_id)
        
        if user_ratings_df.empty:
            return {}
        
        # Analyze ratings distribution
        rating_counts = user_ratings_df['rating'].value_counts().sort_index()
        
        # Analyze genre preferences
        genres = defaultdict(list)
        
        for _, row in user_ratings_df.iterrows():
            if pd.notna(row['genres']) and row['genres']:
                for genre in row['genres'].split('|'):
                    genres[genre].append(row['rating'])
        
        # Calculate average rating per genre
        genre_ratings = {}
        for genre, ratings in genres.items():
            if len(ratings) >= 3:  # Only include genres with enough ratings
                genre_ratings[genre] = {
                    'average': sum(ratings) / len(ratings),
                    'count': len(ratings)
                }
        
        # Sort genres by average rating
        sorted_genres = sorted(
            genre_ratings.items(), 
            key=lambda x: (x[1]['average'], x[1]['count']), 
            reverse=True
        )
        
        # Calculate overall stats
        overall_stats = {
            'total_ratings': len(user_ratings_df),
            'average_rating': user_ratings_df['rating'].mean(),
            'rating_distribution': rating_counts.to_dict(),
            'favorite_genres': sorted_genres[:5],
            'least_favorite_genres': sorted_genres[-5:] if len(sorted_genres) >= 5 else []
        }
        
        return overall_stats

In [None]:
# Example usage code for Google Colab
def run_recommender_demo():
    # Create and set up the recommender
    recommender = GroupLensRecommender()
    
    # Connect to BigQuery
    if not recommender.connect_to_bigquery():
        print("Failed to connect to BigQuery. Please check your authentication.")
        return
    
    # Load data (with limit to avoid memory issues)
    if not recommender.load_data_from_bigquery(max_ratings=5000000):
        print("Failed to load data from BigQuery.")
        return
    
    # Select a sample user that has enough ratings
    target_user_id = None
    
    for user_id, ratings in recommender.user_ratings.items():
        if len(ratings) >= 20:
            target_user_id = user_id
            break
    
    if target_user_id is None:
        print("No suitable user found for demonstration.")
        return
    
    print(f"\nSelected User {target_user_id} for demonstration")
    
    # Show user's top-rated movies
    user_ratings = recommender.get_user_ratings(target_user_id)
    print(f"\nUser {target_user_id} has rated {len(user_ratings)} movies. Top rated:")
    print(user_ratings.head(5)[['title', 'genres', 'rating']])
    
    # Find similar users
    similar_users = recommender.find_similar_users(target_user_id, n=5)
    print("\nMost similar users:")
    for user_id, similarity in similar_users:
        print(f"- User {user_id}: Similarity = {similarity:.4f} ({len(recommender.user_ratings[user_id])} ratings)")
    
    # Get recommendations
    print("\nGenerating recommendations...")
    recommendations = recommender.recommend_movies(target_user_id, n=10)
    
    print("\nRecommended movies:")
    if not recommendations.empty:
        print(recommendations[['title', 'genres', 'normalized_score']])
    
    # Get genre-specific recommendations
    favorite_genre = None
    user_analysis = recommender.analyze_user_tastes(target_user_id)
    
    if user_analysis and user_analysis['favorite_genres']:
        favorite_genre = user_analysis['favorite_genres'][0][0]
        
        print(f"\nRecommendations for favorite genre ({favorite_genre}):")
        genre_recs = recommender.get_genre_recommendations(target_user_id, favorite_genre)
        
        if not genre_recs.empty:
            print(genre_recs[['title', 'genres', 'normalized_score']])
    
    # Show user analysis
    if user_analysis:
        print("\nUser taste analysis:")
        print(f"- Total ratings: {user_analysis['total_ratings']}")
        print(f"- Average rating: {user_analysis['average_rating']:.2f}")
        
        print("\nFavorite genres:")
        for genre, stats in user_analysis['favorite_genres']:
            print(f"- {genre}: {stats['average']:.2f} average rating ({stats['count']} movies)")

# Run the demo when the script is executed directly
if __name__ == "__main__":
    run_recommender_demo()