整体流程：
1. 读取 col_matrix.csv
2. 划分数据：
    - 用户相似度部分：所有用户对前2700列的评分
    - 验证集：随机1/10用户（不在测试区间），对[2700: ]列的评分
    - 测试集：col_matrix[4100:, 2700:]
3. 计算用户之间的相似度矩阵（带掩码 + 惩罚）
4. 在验证集/测试集上做评分预测：
    - 对每个目标位置 (i,j)，选出对 j 打过分的 top-k 相似用户，取其评分均值
5. 生成 test_prediction.csv

In [5]:
import numpy as np
import pandas as pd
from tqdm import tqdm

In [6]:
def penalized_masked_cosine_similarity(x, y, beta=1.0):
    x = np.array(x)
    y = np.array(y)
    mask_inter = (x != 0) & (y != 0)
    mask_union = (x != 0) | (y != 0)

    if np.sum(mask_inter) == 0:
        return 0.0
    
    x_mask = x[mask_inter]
    y_mask = y[mask_inter]
    sim = np.dot(x_mask, y_mask) / (np.linalg.norm(x_mask) * np.linalg.norm(y_mask))
    
    penalty = (np.sum(mask_inter) / np.sum(mask_union)) ** beta
    return sim * penalty

In [15]:
K = 40  # 取相似的 top-k 用户
VALIDATION_RATIO = 0.1  # 验证集比例
RANDOM_SEED = 42

In [8]:
# ---------------------------
# 1. 载入数据
# ---------------------------
col_matrix = np.loadtxt("/home/csu/recommend/col_matrix.csv", delimiter=",")
n_users, n_items = col_matrix.shape

# ---------------------------
# 2. 划分验证集 & 相似度矩阵计算区间
# ---------------------------
np.random.seed(RANDOM_SEED)
all_users = np.arange(4100)  # 非测试区间用户
val_users = np.random.choice(all_users, int(len(all_users) * VALIDATION_RATIO), replace=False)
val_mask = np.zeros_like(col_matrix, dtype=bool)
val_mask[val_users, 2700:] = True

In [9]:
# ---------------------------
# 3. 计算用户之间的相似度（仅基于 [:, :2700]）
# ---------------------------
print("计算用户相似度矩阵...")
user_vectors = col_matrix[:, :2700]
similarity_matrix = np.zeros((n_users, n_users))

for i in tqdm(range(n_users)):
    for j in range(i, n_users):
        sim = penalized_masked_cosine_similarity(user_vectors[i], user_vectors[j])
        similarity_matrix[i, j] = sim
        similarity_matrix[j, i] = sim  # 对称

计算用户相似度矩阵...


  0%|          | 0/6040 [00:00<?, ?it/s]

100%|██████████| 6040/6040 [13:13<00:00,  7.62it/s] 


In [16]:
# ---------------------------
# 4. 评分预测函数（对验证集 or 测试集）
# ---------------------------
def predict_scores(target_users, target_items):
    prediction_matrix = np.zeros((len(target_users), len(target_items)))

    for idx_i, i in enumerate(tqdm(target_users)):
        for idx_j, j in enumerate(target_items):
            # 找到对 j 评分过的用户
            rated_users = np.where(col_matrix[:, j] != 0)[0]

            #  如果是验证用户，不能让他看自己的验证评分！
            if i in val_users and j >= 2700:
                rated_users = rated_users[rated_users != i]

            if len(rated_users) == 0:
                prediction_matrix[idx_i, idx_j] = 3  # 无人评分默认中立
                continue

            # 找相似的用户并排序
            sims = similarity_matrix[i, rated_users]
            top_k_idx = np.argsort(sims)[-K:][::-1]
            top_k_users = rated_users[top_k_idx]
            top_k_sims = sims[top_k_idx]
            top_k_ratings = col_matrix[top_k_users, j]

            if np.sum(top_k_sims) == 0:
                prediction_matrix[idx_i, idx_j] = 3
            else:
                prediction_matrix[idx_i, idx_j] = np.dot(top_k_ratings, top_k_sims) / np.sum(top_k_sims)

    prediction_matrix = np.clip(np.round(prediction_matrix), 1, 5)
    return prediction_matrix


In [17]:
# ---------------------------
# 5. 预测验证集评分并评估
# ---------------------------
print("预测验证集...")
val_users_sorted = sorted(val_users)
val_items = list(range(2700, n_items))
val_truth = col_matrix[np.ix_(val_users_sorted, val_items)]
val_pred = predict_scores(val_users_sorted, val_items)

mae = np.mean(np.abs((val_truth[val_truth != 0] - val_pred[val_truth != 0])))
print(f"验证集 MAE: {mae:.4f}")

预测验证集...


100%|██████████| 410/410 [00:51<00:00,  8.00it/s]

验证集 MAE: 0.7215





In [18]:
# ---------------------------
# 6. 预测测试集评分并输出
# ---------------------------
print("预测测试集...")
test_users = list(range(4100, n_users))
test_items = list(range(2700, n_items))
test_pred = predict_scores(test_users, test_items)

np.savetxt("test_prediction.csv", test_pred, fmt="%d", delimiter=",")
print("预测完成，结果已保存为 test_prediction.csv")

预测测试集...


  0%|          | 0/1940 [00:00<?, ?it/s]

100%|██████████| 1940/1940 [03:56<00:00,  8.20it/s]


预测完成，结果已保存为 test_prediction.csv


In [19]:
import numpy as np
predictions =np.loadtxt('test_prediction.csv', delimiter=',')
assert predictions.shape == (1940, 1252)
assert not np.isnan(predictions).any()
assert not np.isinf(predictions).any()
print("结果格式正确")

结果格式正确
