In [13]:
# Title:  CSEN272 Project 2 
# Author: Yanxu Wu (W1650780)

from Utils.UMR import*
from Utils.KNN import*
import numpy as np
import math
import time

# try different test and result files for all results
test_file = 'test20.txt'
result_file = 'result20.txt'


# Load data into 200*1000 training matrix
def load_train_mat(path: str) -> np.ndarray:
    data = np.loadtxt(path, dtype=int)
    mat = np.zeros((200, 1000))
    mat[data[:, 0] - 1, data[:, 1] - 1] = data[:, 2]
    
    return mat


train_mat = load_train_mat('train.txt')


# Get average user ratings from training data
def avg_train_rating(mat: np.ndarray) -> dict:
    non_zero_counts = np.count_nonzero(mat, axis=1)
    sum_ratings = np.sum(mat, axis=1)
    avg_ratings = np.divide(sum_ratings, non_zero_counts, out=np.zeros_like(sum_ratings), where=non_zero_counts != 0)
    
    return {i + 1: avg_ratings[i] for i in range(len(avg_ratings))}

# Get the predicition tuple object: (user_id, movie_id, rating)
def pred_movie_list(path: str) -> UMR:
    data = np.loadtxt(path, dtype=int)
    mask = data[:, 2] == 0
    users = data[mask, 0].tolist()
    movies = data[mask, 1].tolist()
    ratings = data[mask, 2].tolist()
    
    return UMR(users, movies, ratings)


# Get average movie ratings from training data
def avg_movie_rating(mat: np.ndarray) -> dict:
    non_zero_counts = np.count_nonzero(mat, axis=0)
    sum_ratings = np.sum(mat, axis=0)
    avg_ratings = np.divide(sum_ratings, non_zero_counts, out=np.zeros_like(sum_ratings), where=non_zero_counts != 0)
    
    return {i + 1: avg_ratings[i] for i in range(len(avg_ratings))}


# Get UMR object from test data
def user_ratings_in_test(path: str, user_id: int) -> UMR:
    data = np.loadtxt(path, dtype=int)
    mask = (data[:, 0] == user_id) & (data[:, 2] != 0)
    users = data[mask, 0].tolist()
    movies = data[mask, 1].tolist()
    ratings = data[mask, 2].tolist()
    
    return UMR(users, movies, ratings)


# Get average user ratings from test data
def avg_user_rating_test(path: str, user_id: int) -> float:
    data = np.loadtxt(path, dtype=int)
    mask = (data[:, 0] == user_id) & (data[:, 2] != 0)
    ratings = data[mask, 2]
    
    if ratings.size > 0:
        return ratings.mean()
    
    return 0.0

# IUF method
def inv_user_freq(mat: np.ndarray, movie_id: int) -> float:
    m = np.count_nonzero(mat[:, movie_id - 1])
    
    return math.log10(200.0 / m) if m != 0 else 1


# calculate cosine similarity
def calc_cos_sim(user_id: int, k: int, user_obj: UMR, mat: np.ndarray, train_avg: dict) -> list:
    neighbors = []
    movies = user_obj.get_movies()
    ratings = user_obj.get_ratings()

    for usr in range(200):
        num = 0.0
        denom_test = 0.0
        denom_train = 0.0
        #iuf_id = 0

        for i in range(len(movies)):
            movie_id = movies[i]
            #iuf_id = movie_id
            if ratings[i] != 0 and mat[usr][movie_id - 1] != 0:
                num += ratings[i] * mat[usr][movie_id - 1]
                denom_test += ratings[i] ** 2
                denom_train += mat[usr][movie_id - 1] ** 2

        denom_total = math.sqrt(denom_test) * math.sqrt(denom_train)

        if denom_total != 0.0:
            sim = num / denom_total
            # Case amplification
            sim *= math.fabs(sim) ** 1.5 # p factor
            # IUF 
            #sim *= inv_user_freq(mat, iuf_id)
            neighbors.append(KNN(usr + 1, sim))

    neighbors.sort(key=lambda x: x.sim, reverse=True) # Choose top k users
    
    return neighbors[:k]


# Get Prediction using Cosine method
def calc_rating_cos(user_id: int, movie_id: int, k: int, user_obj: UMR, mat: np.ndarray, train_avg: dict, movie_avg: dict) -> int:
    sim_list = calc_cos_sim(user_id, k, user_obj, mat, train_avg)
    avg_movie = movie_avg[movie_id]
    avg_user = avg_user_rating_test(test_file, user_id)

    num = 0.0
    denom = 0.0

    for neighbor in sim_list:
        train_user_id = neighbor.get_user_id()
        if mat[train_user_id - 1][movie_id - 1] > 0:
            rating = mat[train_user_id - 1][movie_id - 1]
            num += neighbor.get_sim() * rating
            denom += neighbor.get_sim()

    if denom != 0:
        result = num / denom
        
    elif avg_movie != 0:
        result = avg_movie
    
    else:
        result = avg_user

    return int(round(result))


# Calculate Pearson correlation
def calc_pearson_sim(user_id: int, k: int, user_obj: UMR, mat: np.ndarray, train_avg: dict, user_avg: float) -> list:
    neighbors = []
    movies = user_obj.get_movies()
    ratings = user_obj.get_ratings()

    for usr in range(200):
        num = 0.0
        denom_test = 0.0
        denom_train = 0.0
        #iuf_id = 0
        
        for i in range(len(movies)):
            movie_id = movies[i]
            #iuf_id = movie_id
            
            if ratings[i] != 0 and mat[usr][movie_id - 1] != 0:
                avg_train = train_avg[usr + 1]
                num += (ratings[i] - user_avg) * (mat[usr][movie_id - 1] - avg_train)
                denom_test += (ratings[i] - user_avg) ** 2
                denom_train += (mat[usr][movie_id - 1] - avg_train) ** 2
              

        denom_total = math.sqrt(denom_test) * math.sqrt(denom_train) if denom_test > 0 and denom_train > 0 else 0.0

        if denom_total != 0.0:
            sim = num / denom_total
            #sim *= math.fabs(sim) ** 1.5 # Case Amplification
            #sim *= inv_user_freq(mat, iuf_id) # IUF
            neighbors.append(KNN(usr + 1, sim))

    neighbors.sort(key=lambda x: x.sim, reverse=True)
    
    return neighbors[:k]


# Get Prediction using Pearson
def calc_rating_pearson(user_id: int, movie_id: int, k: int, user_obj: UMR, mat: np.ndarray, train_avg: dict, movie_avg: dict, user_avg: float) -> int:
    sim_list = calc_pearson_sim(user_id, k, user_obj, mat, train_avg, user_avg)
    avg_movie = movie_avg[movie_id]

    num = 0.0
    denom = 0.0

    for neighbor in sim_list:
        train_user_id = neighbor.get_user_id()
        avg_train = train_avg[train_user_id]

        if mat[train_user_id - 1][movie_id - 1] > 0:
            weight = mat[train_user_id - 1][movie_id - 1] - avg_train
            num += weight * neighbor.get_sim()
            denom += math.fabs(neighbor.get_sim())

    if denom != 0:
        result = user_avg + num / denom
        
    elif avg_movie != 0:
        result = avg_movie
        
    else:
        result = user_avg

    result = int(round(result))
    result = min(max(result, 1), 5)  # Edge case: ensure result is between 1 and 5
    
    return result


item_user_movie = {}


# Calculate Item based with adjusted cosine similarity
def item_cos_sim(user_id: int, movie_id: int, k: int, user_obj: UMR, mat: np.ndarray, train_avg: dict) -> list:
    neighbors = []
    movies = user_obj.get_movies()

    for usr in range(len(movies)):
        num = 0.0
        denom_i = 0.0
        denom_j = 0.0
        common = 0
        rated_movie = movies[usr]

        for user in range(200):
            avg_train = train_avg[user + 1]
            if mat[user][rated_movie - 1] != 0 and mat[user][movie_id - 1] != 0:
                num += (mat[user][rated_movie - 1] - avg_train) * (mat[user][movie_id - 1] - avg_train)
                denom_i += mat[user][rated_movie - 1] ** 2
                denom_j += mat[user][movie_id - 1] ** 2
                common += 1

        if common > 1:
            denom_total = math.sqrt(denom_i) * math.sqrt(denom_j)
            sim = num / denom_total
            sim *= math.fabs(sim) ** 1.5 # case amplification
            neighbors.append(KNN(usr + 1, sim))
            #print(f"{sim}")
            item_user_movie[usr + 1] = movie_id

    neighbors.sort(key=lambda x: x.sim, reverse=True)
    
    return neighbors[:k]


# Get Item based prediction with adjusted cosine similarity
def calc_rating_item(user_id: int, movie_id: int, k: int, user_obj: UMR, mat: np.ndarray, movie_avg: dict, user_avg: float, train_avg: dict) -> int:
    sim_list = item_cos_sim(user_id, movie_id, k, user_obj, mat, train_avg)
    avg_movie = movie_avg[movie_id]
    num = 0.0
    denom = 0.0

    movies = user_obj.get_movies()
    ratings = user_obj.get_ratings()

    for i in range(0, len(movies)):
        rating = ratings[i]
        mv_id = movies[i]

        for j in range(0, len(sim_list)):
            neighbor = sim_list[j]
            sim = neighbor.get_sim()
            uid = neighbor.get_user_id()
            sim_mv_id = item_user_movie[uid]
            
            if mv_id == sim_mv_id:
                num += sim * rating
                denom += math.fabs(sim)

    if denom != 0.0:
        result = num / denom
        
    elif avg_movie != 0:
        result = avg_movie
        
    else:
        result = user_avg

    result = int(round(result))
    result = min(max(result, 1), 5)  # Ensure result is between 1 and 5
    return result


def main():
    k = 200 # Choose the top k similar users for prediction
    output_list = pred_movie_list(test_file)
    train_avg = avg_train_rating(train_mat)
    movie_avg = avg_movie_rating(train_mat)
    
    start_time = time.time()  # Start the timer

    try:
        with open(result_file, 'w') as out_file:
            for i in range(len(output_list.get_users())):
                user_id = output_list.get_users()[i]
                movie_id = output_list.get_movies()[i]
                user_obj = user_ratings_in_test(test_file, user_id)
                user_avg = avg_user_rating_test(test_file, user_id)

                rating_cos = calc_rating_cos(user_id, movie_id, k, user_obj, train_mat, train_avg, movie_avg)
                rating_pearson = calc_rating_pearson(user_id, movie_id, k, user_obj, train_mat, train_avg, movie_avg, user_avg)
                rating_item = calc_rating_item(user_id, movie_id, k, user_obj, train_mat, movie_avg, user_avg, train_avg)
                # My own combined method
                rating_combine = int(round(0.65 * rating_cos + 0.35 * rating_pearson))
                # Write results line by line
                line = f"{user_id} {movie_id} {rating_item}" 
                out_file.write(line + '\n')
                
    except FileNotFoundError:
        print("Result file could not be created.")
        
    end_time = time.time()  # End the timer
    elapsed_time = end_time - start_time
    print(f"Execution time: {elapsed_time} seconds")
    
    return "Done!"


main()

Execution time: 46.13840293884277 seconds


'Done!'