In [None]:
import numpy as np
from collections import defaultdict
from typing import Dict, List, Tuple
import heapq

class CollaborativeFiltering:
    def __init__(self, k_neighbors: int = 5, min_similarity: float = 0.1):
        """
        Initialize Collaborative Filtering recommender system
        
        Parameters:
        k_neighbors (int): Number of neighbors to consider for predictions
        min_similarity (float): Minimum similarity threshold for considering neighbors
        """
        self.k_neighbors = k_neighbors
        self.min_similarity = min_similarity
        self.user_ratings = defaultdict(dict)
        self.item_ratings = defaultdict(dict)
        self.user_means = {}
        self.item_means = {}
        
    def add_rating(self, user_id: int, item_id: int, rating: float):
        """Add a single rating to the system"""
        self.user_ratings[user_id][item_id] = rating
        self.item_ratings[item_id][user_id] = rating
        
    def add_ratings(self, ratings: List[Tuple[int, int, float]]):
        """Add multiple ratings at once"""
        for user_id, item_id, rating in ratings:
            self.add_rating(user_id, item_id, rating)
            
    def _calculate_means(self):
        """Calculate mean ratings for users and items"""
        # Calculate user means
        for user_id, ratings in self.user_ratings.items():
            if ratings:
                self.user_means[user_id] = np.mean(list(ratings.values()))
                
        # Calculate item means
        for item_id, ratings in self.item_ratings.items():
            if ratings:
                self.item_means[item_id] = np.mean(list(ratings.values()))
    
    def _pearson_similarity(self, ratings1: Dict[int, float], ratings2: Dict[int, float]) -> float:
        """Calculate Pearson correlation between two sets of ratings"""
        # Find common items/users
        common = set(ratings1.keys()) & set(ratings2.keys())
        if len(common) < 2:  # Need at least 2 common ratings
            return 0.0
            
        # Get ratings for common items/users
        ratings1_common = [ratings1[k] for k in common]
        ratings2_common = [ratings2[k] for k in common]
        
        # Calculate means
        mean1 = np.mean(ratings1_common)
        mean2 = np.mean(ratings2_common)
        
        # Calculate numerator and denominators for Pearson correlation
        num = sum((r1 - mean1) * (r2 - mean2) for r1, r2 in zip(ratings1_common, ratings2_common))
        den1 = np.sqrt(sum((r1 - mean1) ** 2 for r1 in ratings1_common))
        den2 = np.sqrt(sum((r2 - mean2) ** 2 for r2 in ratings2_common))
        
        if den1 * den2 == 0:
            return 0.0
            
        return num / (den1 * den2)
    
    def _get_neighbors(self, target_id: int, ratings_dict: Dict[int, Dict[int, float]]) -> List[Tuple[int, float]]:
        """Get k most similar neighbors"""
        similarities = []
        target_ratings = ratings_dict[target_id]
        
        # Calculate similarities with all other users/items
        for other_id, other_ratings in ratings_dict.items():
            if other_id != target_id:
                sim = self._pearson_similarity(target_ratings, other_ratings)
                if sim > self.min_similarity:
                    similarities.append((other_id, sim))
                    
        # Sort by similarity and return top k
        return heapq.nlargest(self.k_neighbors, similarities, key=lambda x: x[1])
    
    def predict_user_based(self, user_id: int, item_id: int) -> float:
        """Predict rating using user-based collaborative filtering"""
        if user_id not in self.user_ratings or not self.user_ratings[user_id]:
            return self.item_means.get(item_id, 0.0)
            
        neighbors = self._get_neighbors(user_id, self.user_ratings)
        if not neighbors:
            return self.user_means.get(user_id, 0.0)
            
        # Calculate weighted average of neighbors' ratings
        numerator = 0
        denominator = 0
        for neighbor_id, similarity in neighbors:
            if item_id in self.user_ratings[neighbor_id]:
                rating = self.user_ratings[neighbor_id][item_id]
                numerator += similarity * (rating - self.user_means[neighbor_id])
                denominator += abs(similarity)
                
        if denominator == 0:
            return self.user_means.get(user_id, 0.0)
            
        return self.user_means[user_id] + (numerator / denominator)
    
    def predict_item_based(self, user_id: int, item_id: int) -> float:
        """Predict rating using item-based collaborative filtering"""
        if item_id not in self.item_ratings or not self.item_ratings[item_id]:
            return self.user_means.get(user_id, 0.0)
            
        neighbors = self._get_neighbors(item_id, self.item_ratings)
        if not neighbors:
            return self.item_means.get(item_id, 0.0)
            
        # Calculate weighted average of ratings for similar items
        numerator = 0
        denominator = 0
        for neighbor_id, similarity in neighbors:
            if user_id in self.item_ratings[neighbor_id]:
                rating = self.item_ratings[neighbor_id][user_id]
                numerator += similarity * rating
                denominator += abs(similarity)
                
        if denominator == 0:
            return self.item_means.get(item_id, 0.0)
            
        return numerator / denominator
    
    def get_recommendations(self, user_id: int, n_recommendations: int = 5, method: str = 'user') -> List[Tuple[int, float]]:
        """Get top N recommendations for a user"""
        # Get all items the user hasn't rated
        rated_items = set(self.user_ratings[user_id].keys())
        all_items = set(self.item_ratings.keys())
        items_to_predict = all_items - rated_items
        
        # Predict ratings for all unrated items
        predictions = []
        for item_id in items_to_predict:
            if method == 'user':
                pred_rating = self.predict_user_based(user_id, item_id)
            else:
                pred_rating = self.predict_item_based(user_id, item_id)
            predictions.append((item_id, pred_rating))
            
        # Return top N recommendations
        return heapq.nlargest(n_recommendations, predictions, key=lambda x: x[1])

# Example usage
def main():
    # Create sample ratings
    ratings_data = [
        (1, 1, 5.0), (1, 2, 3.0), (1, 4, 1.0),
        (2, 1, 4.0), (2, 4, 1.0),
        (3, 1, 1.0), (3, 2, 1.0), (3, 4, 5.0),
        (4, 1, 1.0), (4, 4, 4.0),
        (5, 2, 1.0), (5, 3, 5.0), (5, 4, 4.0),
    ]
    
    # Initialize recommender
    cf = CollaborativeFiltering(k_neighbors=2, min_similarity=0.1)
    
    # Add ratings
    cf.add_ratings(ratings_data)
    
    # Calculate means (needed for predictions)
    cf._calculate_means()
    
    # Get recommendations for user 1
    print("\nUser-based recommendations for user 1:")
    recommendations = cf.get_recommendations(1, n_recommendations=2, method='user')
    for item_id, pred_rating in recommendations:
        print(f"Item {item_id}: Predicted rating = {pred_rating:.2f}")
    
    print("\nItem-based recommendations for user 1:")
    recommendations = cf.get_recommendations(1, n_recommendations=2, method='item')
    for item_id, pred_rating in recommendations:
        print(f"Item {item_id}: Predicted rating = {pred_rating:.2f}")

if __name__ == "__main__":
    main()