In [16]:
import numpy as np

def read_ratings_from_file(filename):
    with open(filename, 'r') as f:
        ratings = [float(line.split(",")[2]) for line in f.readlines()[1:]]
    return ratings

def calculate_rmse(predicted, actual):
    diff_squared = [(p - a)**2 for p, a in zip(predicted, actual)]
    rmse = np.sqrt(sum(diff_squared) / len(predicted))
    return rmse

def bin_differences(predicted, actual):
    diff = [abs(p - a) for p, a in zip(predicted, actual)]
    bins = {"<1": 0, "1~2": 0, "2~3": 0, "3~4": 0, "4~5": 0}
    
    for d in diff:
        if d < 1: bins["<1"] += 1
        elif 1 <= d < 2: bins["1~2"] += 1
        elif 2 <= d < 3: bins["2~3"] += 1
        elif 3 <= d < 4: bins["3~4"] += 1
        else: bins["4~5"] += 1
    
    return bins

def print_stats(ratings, label):
    print(f"{label} mean: {np.mean(ratings)}")
    print(f"{label} std: {np.std(ratings)}")

def main():
    predicted_ratings = read_ratings_from_file("output2_3.csv")
    actual_ratings = read_ratings_from_file("publicdata/yelp_val.csv")

    rmse = calculate_rmse(predicted_ratings, actual_ratings)
    differences = bin_differences(predicted_ratings, actual_ratings)

    print("RMSE:", rmse)
    for key, value in differences.items():
        print(f"Absolute differences in range {key}: {value}")

    print_stats(predicted_ratings, "Prediction")
    print_stats(actual_ratings, "Answer")

if __name__ == "__main__":
    main()


RMSE: 0.9863759406874498
Absolute differences in range <1: 101407
Absolute differences in range 1~2: 33543
Absolute differences in range 2~3: 6283
Absolute differences in range 3~4: 807
Absolute differences in range 4~5: 4
Prediction mean: 3.749827267580163
Prediction std: 0.548377027757119
Answer mean: 3.750499845118414
Answer std: 1.1221530754078377


In [None]:
from pyspark import SparkContext
import sys
import time
import numpy as np
from xgboost import XGBRegressor
import json

class ItemBasedCF:
    def __init__(self):
        self.biz_to_ratings = {}
        self.biz_to_users = {}
        self.user_to_biz = {}
        self.avg_biz_ratings = {}
        self.avg_user_ratings = {}

    def _load_data(self, context, train_file, test_file):
        train_data = context.textFile(train_file+ '/yelp_train.csv').filter(lambda x: "user_id" not in x).map(lambda x: x.split(","))
        test_data = context.textFile(test_file).filter(lambda x: "user_id" not in x).map(lambda x: x.split(",")).map(lambda x: (x[1], x[0]))

        self.biz_to_ratings = train_data.map(lambda x: (x[1], (x[0], float(x[2])))).groupByKey().mapValues(dict).collectAsMap()
        self.biz_to_users = {biz: set(users.keys()) for biz, users in self.biz_to_ratings.items()}
        self.user_to_biz = train_data.map(lambda x: (x[0], x[1])).groupByKey().mapValues(set).collectAsMap()

        self.avg_biz_ratings = train_data.map(lambda x: (x[1], float(x[2]))).groupByKey().mapValues(lambda v: sum(v) / len(v)).collectAsMap()
        self.avg_user_ratings = train_data.map(lambda x: (x[0], float(x[2]))).groupByKey().mapValues(lambda v: sum(v) / len(v)).collectAsMap()

        return test_data

    def _calculate_weights(self, target_bus, current_user):
        if current_user not in self.user_to_biz or target_bus not in self.biz_to_users:
            return 3.5

        weight_list = []

        # Decay factor for mutual users; the lower the count, the higher the decay
        decay = lambda count: 0.5**count

        for other_bus in self.user_to_biz[current_user]:
            mutual_users = self.biz_to_users[target_bus].intersection(self.biz_to_users[other_bus])

            if not mutual_users:
                # Use squared difference
                w_val = 1 - ((self.avg_biz_ratings[target_bus] - self.avg_biz_ratings[other_bus])**2) / 25
            else:
                # Use squared differences for mutual users' ratings
                rate_diffs = [(self.biz_to_ratings[target_bus][u] - self.biz_to_ratings[other_bus][u])**2 for u in mutual_users]
                w_val = 1 - sum(rate_diffs) / (25 * len(rate_diffs))
                w_val *= decay(len(mutual_users))

            weight_list.append((w_val, self.biz_to_ratings[other_bus].get(current_user, self.avg_user_ratings[current_user])))

        # Taking top weights
        top_weights = sorted(weight_list, key=lambda x: -x[0])[:15]

        sum_product = sum([weight * rating for weight, rating in top_weights])
        total_weights = sum([abs(weight) for weight, _ in top_weights])

        return sum_product / total_weights if total_weights != 0 else 3.5

    def generate_predictions(self, train_file, test_file):
        with SparkContext(appName="ItemBasedCF") as context:
            context.setLogLevel('Error')
            test_data = self._load_data(context, train_file, test_file)
            return {(user, biz): self._calculate_weights(biz, user) for biz, user in test_data.collect()}

class XGBRatingPredictor: 
    @staticmethod
    def extract_features(data, review_attr, user_attr, business_attr):
        feature_list, user_business_pairs = [], []

        for user, business, *_ in data:
            user_business_pairs.append((user, business))

            review_features = list(review_attr.get(business, [None, None, None]))
            user_features = list(user_attr.get(user, [None, None, None]))
            business_features = list(business_attr.get(business, [None, None]))

            combined_features = review_features + user_features + business_features
            feature_list.append(combined_features)

        return np.array(feature_list, dtype='float32'), user_business_pairs

    @staticmethod
    def train_model(features, labels):
        xgb_model = XGBRegressor()
        xgb_model.fit(features, labels)
        return xgb_model
    def predict(self, train_dir, test_dir):
        context = SparkContext(appName="XGBRatingPredictor")
        context.setLogLevel('Error')
        
        # Loading data
        train_data = context.textFile(train_dir + '/yelp_train.csv') \
            .filter(lambda row: "user_id" not in row) \
            .map(lambda row: row.split(","))

        review_data = context.textFile(train_dir + '/review_train.json') \
            .map(json.loads) \
            .map(lambda row: (row['business_id'], (float(row['useful']), float(row['funny']), float(row['cool'])))) \
            .groupByKey() \
            .mapValues(lambda x: tuple(np.mean(np.array(list(x)), axis=0))) \
            .collectAsMap()

        user_data = context.textFile(train_dir + '/user.json') \
            .map(json.loads) \
            .map(lambda row: (row['user_id'], (float(row['average_stars']), float(row['review_count']), float(row['fans'])))) \
            .collectAsMap()

        business_data = context.textFile(train_dir + '/business.json') \
            .map(json.loads) \
            .map(lambda row: (row['business_id'], (float(row['stars']), float(row['review_count'])))) \
            .collectAsMap()

        # Extract features for training
        train_features, train_labels = zip(*[(features, float(label)) for *features, label in train_data.collect()])
        X_train, _ = self.extract_features(train_features, review_data, user_data, business_data)
        Y_train = np.array(train_labels, dtype='float32')

        # Extract features for testing
        test_data = context.textFile(test_dir) \
            .filter(lambda row: "user_id" not in row) \
            .map(lambda row: row.split(","))
        X_test, user_business_pairs = self.extract_features(test_data.collect(), review_data, user_data, business_data)

        
        xgb_model = self.train_model(X_train, Y_train)
        predictions = xgb_model.predict(X_test)
        
        return {(user, business): pred for (user, business), pred in zip(user_business_pairs, predictions)}

class HybridRecommender:

    def __init__(self, alpha):
        self.alpha = alpha
        self.item_based_recommender = ItemBasedCF()
        self.xgb_recommender = XGBRatingPredictor()

    def predict(self, train_file, test_file):
        # Get predictions from both models
        item_based_preds = self.item_based_recommender.generate_predictions(train_file, test_file)
        xgb_preds = self.xgb_recommender.predict(train_file, test_file)

        # Combine predictions using the equation
        final_preds = {}
        for user_biz, score_item in item_based_preds.items():
            score_model = xgb_preds.get(user_biz, 3.5)
            final_score = self.alpha * score_item + (1 - self.alpha) * score_model
            final_preds[user_biz] = final_score

        return final_preds

    def save_predictions(self, predictions, output_file):
        with open(output_file, 'w') as results_file:
            results_file.write("user_id, business_id, prediction\n")
            for (user, biz), pred in predictions.items():
                results_file.write(f"{user},{biz},{pred}\n")

if __name__ == '__main__':
    start_time = time.time()

    alpha = 0.2
    # Adjust as needed
    train_filepath, test_filepath, output_filepath = sys.argv[1], sys.argv[2], sys.argv[3]

    recommender = HybridRecommender(alpha)
    final_predictions = recommender.predict(train_filepath, test_filepath)
    recommender.save_predictions(final_predictions, output_filepath)

    print("Duration:", time.time() - start_time)


In [9]:
import csv
import time

def load_data(filepath):
    with open(filepath, 'r') as file:
        reader = csv.reader(file)
        data = set(tuple(sorted(row)) for row in reader)
    return data

def compute_metrics(ground_truth, predictions):
    true_positives = len(ground_truth.intersection(predictions))
    false_positives = len(predictions - ground_truth)
    false_negatives = len(ground_truth - predictions)
    
    precision = true_positives / (true_positives + false_positives)
    recall = true_positives / (true_positives + false_negatives)
    
    return precision, recall

def compute_partial_score(precision, recall):
    return (precision / 0.99) * 0.4 + (recall / 0.97) * 0.4

def main():
    start_time = time.time()
    
    # Load data
    ground_truth = load_data('publicdata/pure_jaccard_similarity.csv')
    predictions = load_data('output.csv')
    
    # Compute metrics
    precision, recall = compute_metrics(ground_truth, predictions)
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    
    if precision < 0.99 or recall < 0.97:
        score = compute_partial_score(precision, recall)
        print(f'Partial Score: {score:.4f}')
    else:
        print('Full Credit!')
    
    end_time = time.time()
    runtime = end_time - start_time
    print(f'Runtime: {runtime:.2f} seconds')
    
    if runtime >= 100:
        print('No points, runtime exceeded limit!')

if __name__ == '__main__':
    main()


Precision: 1.0000
Recall: 0.9829
Full Credit!
Runtime: 0.03 seconds
