In [11]:
import pandas as pd

# 创建一个简单的用户-物品DataFrame
data = {
    'userID': [1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4],
    'itemID': ['A', 'B', 'C', 'D', 'A', 'C', 'E', 'B', 'C', 'D', 'C', 'D'],
    'rating': [5, 3, 0, 1, 4, 0, 2, 1, 0, 5, 5, 4]
}

df = pd.DataFrame(data)

# 创建用户-物品矩阵
user_item_matrix = df.pivot(index='userID', columns='itemID', values='rating').fillna(0)

# 计算行向量之间的皮尔逊相关系数
pearson_corr = user_item_matrix.T.corr(method='pearson')

print("用户-物品矩阵行向量之间的皮尔逊相关系数:")
print(pearson_corr)

用户-物品矩阵行向量之间的皮尔逊相关系数:
userID         1         2         3         4
userID                                        
1       1.000000  0.593067 -0.148936 -0.565009
2       0.593067  1.000000 -0.464140 -0.606168
3      -0.148936 -0.464140  1.000000  0.426072
4      -0.565009 -0.606168  0.426072  1.000000


# 皮尔逊计算公式
皮尔逊相关系数与余弦相似度的计算公式如下：

$
\mathrm{pearson}(u, v)=\frac{\sum_{i \in I}(r_{ui}-\bar{r}_{u})(r_{vi}-\bar{r}_{v})}{\sqrt{\sum_{i \in I}(r_{ui}-\bar{r}_{u})^2}\sqrt{\sum_{i \in I}(r_{vi}-\bar{r}_{v})^2}}
$

其中：
- $(r_{ui}, r_{vi})$ 分别表示用户 \(u\) 和用户 \(v\) 对物品 \(i\) 是否有交互（或具体评分值）；
- $(\bar{r}_{u}, \bar{r}_{v})$ 分别表示用户 \(u\) 和用户 \(v\) 交互的所有物品交互数量或者评分的平均值。

In [43]:
import numpy as np
import pandas as pd

def calculate_pearson_corr(row1, row2):
    """
    计算两个行向量之间的皮尔逊相关系数

    参数:
    row1 (pd.Series): 第一个行向量
    row2 (pd.Series): 第二个行向量

    返回:
    float: 皮尔逊相关系数
    """
    # 计算均值
    mean_row1 = np.mean(row1)
    mean_row2 = np.mean(row2)

    # 计算分子
    numerator = np.sum((row1 - mean_row1) * (row2 - mean_row2))

    # 计算分母
    denominator = np.sqrt(np.sum((row1 - mean_row1) ** 2)) * np.sqrt(np.sum((row2 - mean_row2) ** 2))

    # 计算皮尔逊相关系数
    pearson_corr = numerator / denominator

    return pearson_corr


# 单元测试
def test_calculate_pearson_corr():
    """
    测试 calculate_pearson_corr 方法
    """
    # 示例数据
    data = {
        'userID': [1, 3],
        'itemID': ['A', 'B'],
        'rating': [5, 2]
    }

    # 创建用户-物品矩阵
    df = pd.DataFrame(data)

    # 创建用户-物品矩阵
    user_item_matrix = df.pivot(index='userID', columns='itemID', values='rating').fillna(0)

    # 计算 DataFrame 中两个行向量之间的皮尔逊相关系数
    row1 = user_item_matrix.iloc[0]
    row2 = user_item_matrix.iloc[1]
    pearson_corr = calculate_pearson_corr(row1, row2)
    print(pearson_corr)

    # 预期结果
    expected_corr = np.corrcoef(row1, row2)[0, 1]
    print(expected_corr)
    assert np.isclose(pearson_corr, expected_corr), f"Expected {expected_corr}, but got {pearson_corr}"

# 运行单元测试
test_calculate_pearson_corr()
print("单元测试通过")

-0.9999999999999998
-0.9999999999999999
单元测试通过


In [45]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# 定义向量
v1 = np.array([5, 3, 4, 3, 1])
v3 = np.array([3, 1, 3, 3, 5])

# 创建DataFrame
df = pd.DataFrame([v1, v3], index=['v1', 'v3'])

# 计算余弦相似度
cosine_sim = cosine_similarity(df)

# 将结果转换为DataFrame
cosine_sim_df = pd.DataFrame(cosine_sim, index=df.index, columns=df.index)
pearson  = np.corrcoef(df)

print("余弦相似度矩阵:")
print(cosine_sim_df)

print("pearson相似度")
print(pearson)

余弦相似度矩阵:
         v1       v3
v1  1.00000  0.78026
v3  0.78026  1.00000
pearson相似度
[[ 1.         -0.47673129]
 [-0.47673129  1.        ]]


In [None]:
import pandas as pd

# 创建示例DataFrame
data = {
    'A': [1, 2, 3],
    'B': [4, 5, 6],
    'C': [7, 8, 9]
}
df = pd.DataFrame(data, index=['row1', 'row2', 'row3'])
# 计算row1的均值
row_mean = df.loc['row1'].mean()
print(f"row1的均值: {row_mean}")

# 测试SVD运用到协同过滤

In [1]:
import pandas as pd
import numpy as np
import random
import math
from collections import defaultdict

# Load the dataset
def load_dataset(path="./datas/hetrec2011-lastfm-2k"):
    user_artists = pd.read_csv(f"{path}/user_artists.dat", sep='\t')
    artists = pd.read_csv(f"{path}/artists.dat", sep='\t')
    return user_artists, artists

# Preprocess data for SVD
def preprocess_data_for_svd(user_artists):
    # Convert to format needed for SVD
    df = user_artists.copy()
    # Normalize weights to ratings scale (assuming weight is listen count)
    df['rating'] = df['weight'] / df['weight'].max() * 5
    df = df.rename(columns={'userID': 'user', 'artistID': 'item'})
    return df[['user', 'item', 'rating']]

class BiasSVD:
    def __init__(self, rating_data, F=10, alpha=0.01, lmbda=0.1, max_iter=50):
        self.F = F
        self.P = dict()  # User factors
        self.Q = dict()  # Item factors
        self.bu = dict()  # User biases
        self.bi = dict()  # Item biases
        self.mu = 0  # Global average
        self.alpha = alpha  # Learning rate
        self.lmbda = lmbda  # Regularization parameter
        self.max_iter = max_iter
        self.rating_data = rating_data

        # Initialize model parameters
        users = rating_data['user'].unique()
        items = rating_data['item'].unique()

        for user in users:
            self.P[user] = [random.random() / math.sqrt(self.F) for _ in range(F)]
            self.bu[user] = 0
        for item in items:
            self.Q[item] = [random.random() / math.sqrt(self.F) for _ in range(F)]
            self.bi[item] = 0

    def train(self):
        # Calculate global mean
        self.mu = self.rating_data['rating'].mean()
        
        print(f"Training SVD model with {self.F} factors for {self.max_iter} iterations...")
        for step in range(self.max_iter):
            # Shuffle data to improve convergence
            self.rating_data = self.rating_data.sample(frac=1).reset_index(drop=True)
            
            for _, row in self.rating_data.iterrows():
                user = row['user']
                item = row['item']
                rating = row['rating']
                
                # Compute prediction error
                pred = self.predict(user, item)
                error = rating - pred
                
                # Update biases
                self.bu[user] += self.alpha * (error - self.lmbda * self.bu[user])
                self.bi[item] += self.alpha * (error - self.lmbda * self.bi[item])
                
                # Update latent factors
                for f in range(self.F):
                    p_old = self.P[user][f]
                    q_old = self.Q[item][f]
                    
                    self.P[user][f] += self.alpha * (error * q_old - self.lmbda * p_old)
                    self.Q[item][f] += self.alpha * (error * p_old - self.lmbda * q_old)
            
            # Decay learning rate
            if (step + 1) % 10 == 0:
                self.alpha *= 0.9
                print(f"Iteration {step+1}/{self.max_iter} completed")

    def predict(self, user, item):
        # Handle users or items not in training set
        if user not in self.P or item not in self.Q:
            return self.mu
            
        user_bias = self.bu.get(user, 0)
        item_bias = self.bi.get(item, 0)
        
        # Dot product of user and item latent factors
        interaction = sum(self.P[user][f] * self.Q[item][f] for f in range(self.F))
        
        return self.mu + user_bias + item_bias + interaction

    def recommend_items(self, user_id, n_recommendations=10, exclude_items=None):
        if exclude_items is None:
            exclude_items = set()
        else:
            exclude_items = set(exclude_items)
            
        if user_id not in self.P:
            return []  # User not in model
            
        # Get predictions for all items
        recommendations = []
        for item in self.Q:
            if item not in exclude_items:
                pred_rating = self.predict(user_id, item)
                recommendations.append((item, pred_rating))
        
        # Sort by predicted rating
        recommendations.sort(key=lambda x: x[1], reverse=True)
        return recommendations[:n_recommendations]

# Evaluate model using hit rate
def evaluate_hr_k(svd_model, user_item_df, test_ratio=0.2, n_recommendations=10):
    # Create a user-item matrix
    user_item_matrix = user_item_df.pivot(index='user', columns='item', values='rating').fillna(0)
    
    test_set = []
    train_data = user_item_df.copy()
    
    # Split data into train and test
    for user in user_item_matrix.index:
        user_items = user_item_matrix.loc[user]
        rated_items = user_items[user_items > 0].index.tolist()
        
        if len(rated_items) > 5:  # Only test users with enough interactions
            # Sample items for testing
            test_items = np.random.choice(rated_items, size=int(len(rated_items) * test_ratio), replace=False)
            for item in test_items:
                test_set.append((user, item, user_item_matrix.loc[user, item]))
                # Remove from training set
                train_data = train_data[~((train_data['user'] == user) & (train_data['item'] == item))]
    
    # Sample a smaller test set for efficiency
    test_set = random.sample(test_set, min(100, len(test_set)))
    
    # Train model on reduced dataset
    svd_model.rating_data = train_data
    svd_model.train()
    
    # Evaluate
    hit_count = 0
    for user, item, _ in test_set:
        # Get user's rated items in training data
        user_rated_items = train_data[train_data['user'] == user]['item'].tolist()
        recommendations = svd_model.recommend_items(user, n_recommendations, exclude_items=user_rated_items)
        recommended_items = [rec_item for rec_item, _ in recommendations]
        
        if item in recommended_items:
            hit_count += 1
            
    hit_rate = hit_count / len(test_set) if test_set else 0
    return hit_rate

def main():
    # Load and preprocess data
    user_artists, artists = load_dataset()
    
    # Prepare data for SVD
    rating_data = preprocess_data_for_svd(user_artists)
    
    # Create and train SVD model
    svd_model = BiasSVD(rating_data, F=20, alpha=0.01, lmbda=0.1, max_iter=20)
    svd_model.train()
    
    # Example: Get recommendations for a specific user
    user_id = rating_data['user'].iloc[0]  # First user in the dataset
    print(f"\nSVD recommendations for user {user_id}:")
    
    # Get items user has already rated
    user_rated_items = rating_data[rating_data['user'] == user_id]['item'].tolist()
    recommendations = svd_model.recommend_items(user_id, n_recommendations=10, exclude_items=user_rated_items)
    
    for item_id, score in recommendations:
        artist_name = artists[artists['id'] == item_id]['name'].values[0] if item_id in artists['id'].values else "Unknown"
        print(f"Artist ID: {item_id}, Score: {score:.2f}, Name: {artist_name}")
    
    # Evaluate the model
    hit_rate = evaluate_hr_k(svd_model, rating_data)
    print(f"\nEvaluation results:")
    print(f"SVD hit rate@10: {hit_rate:.4f}")

if __name__ == "__main__":
    main()

Training SVD model with 20 factors for 20 iterations...
Iteration 10/20 completed
Iteration 20/20 completed

SVD recommendations for user 2:
Artist ID: 792, Score: 0.18, Name: Thalía
Artist ID: 8388, Score: 0.13, Name: Viking Quest
Artist ID: 18121, Score: 0.12, Name: Rytmus
Artist ID: 8308, Score: 0.12, Name: Johnny Hallyday
Artist ID: 15075, Score: 0.12, Name: 80kidz
Artist ID: 14987, Score: 0.12, Name: RICHARD DIXON-COMPOSER
Artist ID: 10349, Score: 0.12, Name: Peter Thomas Sound Orchestra
Artist ID: 2044, Score: 0.11, Name: Sarah Brightman
Artist ID: 14986, Score: 0.11, Name: Dicky Dixon
Artist ID: 6696, Score: 0.11, Name: Mara Maravilha
Training SVD model with 20 factors for 20 iterations...
Iteration 10/20 completed
Iteration 20/20 completed

Evaluation results:
SVD hit rate@10: 0.0000


In [None]:
import os

def list_files(startpath):
    with open('project_structure.txt', 'w') as f:
        for root, dirs, files in os.walk(startpath):
            level = root.replace(startpath, '').count(os.sep)
            indent = ' ' * 4 * (level)
            f.write(f'{indent}{os.path.basename(root)}/\n')
            subindent = ' ' * 4 * (level + 1)
            for file in files:
                f.write(f'{subindent}{file}\n')

# 将这里的path_to_project替换为实际的项目路径
list_files('path_to_project')