In [None]:
from pyspark.sql import SparkSession  
import pyspark.sql.functions as F
from pyspark.sql.window import Window  

import warnings
warnings.filterwarnings("ignore")

In [None]:
# 初始化SparkSession
spark = SparkSession.builder \
    .appName("MovieLens Recommendation System") \
    .getOrCreate()

In [None]:
path_to_users_data = "./data/ml-1m/users.dat"
path_to_movies_data = "./data/ml-1m/movies.dat"
path_to_ratings_data = "./data/ml-1m/ratings.dat"

users_column_names = ["user_id", "gender", "age", "occupation", "zip_code"]
movies_column_names = ["movie_id", "title", "genres"]
ratings_column_names = ["user_id", "movie_id", "rating", "timestamp"]

user_df = spark.read.csv(path_to_users_data,
                          sep='::',
                          header=False,
                          inferSchema=True)
movie_df = spark.read.csv(path_to_movies_data,
                           sep='::',
                           header=False,
                           inferSchema=True)
rating_df = spark.read.csv(path_to_ratings_data,
                            sep='::',
                            header=False,
                            inferSchema=True)
user_df = user_df.toDF(*users_column_names)
movie_df = movie_df.toDF(*movies_column_names)
rating_df = rating_df.toDF(*ratings_column_names)

In [None]:
# 定义一个函数来计算IQR并去除异常值
def remove_outliers(df, column):
    # 计算四分位数
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    
    # 计算异常值的边界
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    # 过滤掉异常值
    return df.filter((F.col(column) >= lower_bound) & (F.col(column) <= upper_bound))

rating_df = remove_outliers(rating_df, 'rating')

In [None]:
rating_df = rating_df.join(movie_df, 'movie_id', 'left')
movie_df = rating_df.select(['movie_id', 'title', 'genres']).distinct()

In [None]:
# 创建一个 UDF (User Defined Function) 来进行评分的分段
def rating_segment(avg_rating):
    if 1 <= avg_rating < 1.5:
        return 1
    elif 1.5 <= avg_rating < 2:
        return 2
    elif 2 <= avg_rating < 2.5:
        return 3
    elif 2.5 <= avg_rating < 3:
        return 4
    elif 3 <= avg_rating < 3.5:
        return 5
    elif 3.5 <= avg_rating < 4:
        return 6
    elif 4 <= avg_rating < 4.5:
        return 7
    elif 4.5 <= avg_rating <= 5:
        return 8
    else:
        return None  # 处理无效的评分

# 将 UDF 注册到 Spark Session
segmented_rating_udf = F.udf(rating_segment)

window_user = Window.partitionBy("user_id")
rating_df = rating_df.withColumn('user_avg_rating', F.avg('rating').over(window_user))
rating_df = rating_df.withColumn('label',
                                 F.when((F.col('rating') >= F.col('user_avg_rating')), 1).otherwise(0))
rating_df = rating_df.withColumn('user_avg_rating', segmented_rating_udf(F.col('user_avg_rating')))

window_movie = Window.partitionBy("movie_id")
rating_df = rating_df.withColumn('movie_avg_rating', F.avg('rating').over(window_movie))
rating_df = rating_df.withColumn('movie_avg_rating', segmented_rating_udf(F.col('movie_avg_rating')))



In [None]:
rating_df.filter(F.col('user_avg_rating').isNull()).count()

In [None]:
movie_df = rating_df.select(['movie_id', 'title', 'genres', 'movie_avg_rating']).distinct()
# 定义一个窗口规范，这里我们不使用分区，所以整个DataFrame被视为一个单一的分区  
windowSpec = Window.orderBy(F.col("movie_id").asc())  
# 使用row_number()生成一个新的ID列，命名为new_movie_id  
movie_df = movie_df.withColumn("new_movie_id", F.row_number().over(windowSpec))  
movie_df = movie_df.drop('movie_id')
movie_df = movie_df.withColumnRenamed('new_movie_id', 'movie_id')


In [None]:
u_df = rating_df.select(['user_id', 'user_avg_rating']).distinct()
user_df = user_df.join(u_df, ['user_id'], 'left').select(['user_id', 'gender', 'age', 'occupation', 'user_avg_rating'])

In [None]:
rating_df = rating_df.drop(*['movie_id', 'genres', 'movie_avg_rating'])
rating_df = rating_df.join(movie_df, 'title')
rating_df = rating_df.drop(*['title'])
movie_df = movie_df.drop(*['title'])

In [None]:
null_num = movie_df.filter(~F.col('movie_avg_rating').isNotNull()).count()
null_num

In [None]:
user_df.write.mode('overwrite').parquet('./user_info_dcn.parquet')
movie_df.write.mode('overwrite').parquet('./movie_info_dcn.parquet')
rating_df.write.mode('overwrite').parquet('./rating_info_dcn.parquet')

In [None]:
spark.stop()

重复值检测

In [None]:
def has_duplicates(df, cols=None):
    """  
    检测DataFrame中是否存在重复的行。  
  
    :param df: pyspark.sql.DataFrame  
        要检查的DataFrame。  
    :param cols: list of str, optional  
        用于检测重复项的列名列表。如果为None，则使用所有列。  
    :return: bool  
        如果存在重复行，则返回True；否则返回False。  
    """
    # 如果cols为None，则使用所有列
    if cols is None:
        cols = df.columns

    # 去除指定列上的重复行
    df_no_duplicates = df.dropDuplicates(cols)

    # 计算原始DataFrame和去除重复项后的DataFrame的行数
    original_count = df.count()
    no_duplicates_count = df_no_duplicates.count()

    # 如果行数不同，则表示存在重复项
    return original_count != no_duplicates_count


# 调用函数并打印结果
print(has_duplicates(df))  
# 检测特定列是否存在重复
# print(has_duplicates(data_df, ["user_id", "movie_id", "timestamp"])) 

缺失值检测

In [None]:
def find_missing_values(df):
    """
    This function takes a DataFrame df as input and prints the names of columns with missing values.
    It selects the first column of the DataFrame for counting missing values.
    
    Parameters:
    df (DataFrame): The input DataFrame to check for missing values.
    """
    first_column = df.columns[0]  # Get the name of the first column

    for column_name in df.columns:
        # Filter rows where the current column is null, select the first column and count them
        missing_value = df.filter(
            F.col(column_name).isNull()).select(first_column)
        count_missing = missing_value.count()

        # If there are missing values, print the column name
        if count_missing > 0:
            print(f"Column '{column_name}' has missing values.")
        else:
            print(f"Column '{column_name}' does not have missing values.")
    return print("All columns checked.")


find_missing_values(user_df)
find_missing_values(movie_df)
find_missing_values(rating_df)

In [None]:
from pyspark.sql import SparkSession  
import pyspark.sql.functions as F
from pyspark.sql.window import Window  
from pyspark.ml.feature import VectorAssembler
import random


import warnings
warnings.filterwarnings("ignore")

In [None]:
# 初始化SparkSession
spark = SparkSession.builder \
    .appName("MovieLens Recommendation System") \
    .getOrCreate()

In [None]:
user_df = spark.read.parquet('./user_info.parquet')
movie_df = spark.read.parquet('./movie_info.parquet')
rating_df = spark.read.parquet('./rating_info.parquet')

In [None]:
print(user_df.count(), movie_df.count(), rating_df.count())
print(rating_df.select('user_id').distinct().count(),\
      rating_df.select('movie_id').distinct().count())

In [None]:
# 假设 `df` 是你的 DataFrame，`column_name` 是你要检查的列名
null_count = movie_df.filter(F.col('movie_avg_rating').isNull()).count()
null_count

编码

In [None]:
def OneHotEncoder(df, col_name):
    unique_col = df.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
    unique_col.sort()
    for item in unique_col:
        df = df.withColumn(str(item), F.when(F.col(col_name)==item, 1).otherwise(0))
    # 使用VectorAssembler将多个二进制列合并成一个向量列
    assembler = VectorAssembler(inputCols=[str(item) for item in unique_col],
                                outputCol=col_name+'Vector')
    df = assembler.transform(df)
    # 删除中间创建的one-hot编码列
    for item in unique_col:
        df = df.drop(str(item))
    df = df.drop(col_name)
    return df

def MultiHotEncoder(df, col_name):
    # 对genres进行多热编码
    # 拆分genres列
    col_split = df.withColumn(col_name, F.explode(F.split(F.col(col_name), "\\|")))
    # 获取所有genre的列表
    unique_col = col_split.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
    # 对每个genre进行multi-hot编码
    for item in unique_col:
        if isinstance(item, str):
            df = df.withColumn(item, F.when(F.col(col_name).contains(item), 1).otherwise(0))
    # 使用VectorAssembler将多个二进制列合并成一个向量列
    assembler = VectorAssembler(inputCols=[item for item in unique_col if isinstance(item, str)],
                                outputCol=col_name+'Vector')
    df = assembler.transform(df)
    # 删除中间创建的one-hot编码列
    for item in unique_col:
        if isinstance(item, str):
            df = df.drop(item)
    df = df.drop(col_name)
    return df
    

def user_data_processing(df):
    # 将gender改为二值变量
    df = df.withColumn("gender",
                                F.when(df["gender"] == "F", 0).otherwise(1))
    # 对occupation进行独热编码
    df = OneHotEncoder(df, 'occupation')
    # 对age进行独热编码
    df = OneHotEncoder(df, 'age')
    # 对user_avg_rating进行独热编码
    df = OneHotEncoder(df, 'user_avg_rating')
    return df

def movie_data_processing(df):
    # 对genres进行独热编码
    df = MultiHotEncoder(df, 'genres')
    # 对movie_avg_rating进行独热编码
    df = OneHotEncoder(df, 'movie_avg_rating')
    # 同时删除title和genres列
    return df

# 假设data_df是合并后的DataFrame
user_df = user_data_processing(user_df)
movie_df = movie_data_processing(movie_df)


In [None]:
user_df.write.mode('overwrite').parquet('./user_df.parquet')
movie_df.write.mode('overwrite').parquet('./movie_df.parquet')

In [None]:
rating_df.count()

In [None]:
rating_df = rating_df.drop(*['user_avg_rating', 'movie_avg_rating', 'genres'])
rating_df = rating_df.join(user_df, 'user_id')
rating_df = rating_df.join(movie_df, 'movie_id')

In [None]:
rating_df.write.mode('overwrite').parquet('./rating_df_dcn.parquet')

In [None]:
null_num = rating_df.filter(F.col('movie_avg_ratingVector').isNull()).count()
null_num

In [None]:
spark.stop()

age与age_vector的对应关系

In [None]:
mapage = rating_df.select(['user_id', 'ageVector']).distinct()

In [None]:
map_age2vector = rating_df.select(['age', 'ageVector']).distinct().show()
map_occupation2vector = rating_df.select(['occupation', 'occupationVector']).distinct().show()
map_user_avg_rating2vector = rating_df.select(['user_avg_rating', 'user_avg_ratingVector']).distinct().show()
map_movie_avg_rating2vector = rating_df.select(['movie_avg_rating', 'movie_avg_ratingVector']).distinct().show()

模拟电影冷启动数据

In [None]:
# def select_five_movies(data_df, num_movies=5):
#     # 获取唯一的movie_id列表
#     unique_movie_ids = data_df.select("movie_id").distinct().collect()
#     # 将DataFrame的行转换为Python列表
#     unique_movie_ids_list = [row["movie_id"] for row in unique_movie_ids]
#     # 随机选择num_movies个不同的电影ID
#     sampled_movie_ids = random.sample(unique_movie_ids_list, num_movies)

#     # 对每个选中的电影ID进行分组并计数
#     movie_counts = data_df.groupBy("movie_id").count()
#     # 过滤出选中的电影ID的计数
#     sampled_movie_counts = movie_counts.filter(
#         F.col("movie_id").isin(sampled_movie_ids)).collect()

#     # 打印每个选中的电影的评分记录总数
#     for row in sampled_movie_counts:
#         print(f"Movie ID {row['movie_id']} has {row['count']} ratings.")

#     # 返回选定的movie_ids
#     return sampled_movie_ids


# # 调用函数并打印结果
# sampled_movie_ids = select_five_movies(rating_df)
# print("Selected movie IDs:", sampled_movie_ids)

# # 根据随机选择的movie_id创建备份DataFrame
# cold_movies_df = rating_df.filter(F.col("movie_id").isin(sampled_movie_ids))

# # 可以选择将备份DataFrame写入磁盘或进行其他处理
# cold_movies_df.write.mode('overwrite').parquet(
#     './movies_cold_data.parquet')

# # 现在你可以使用sampled_movie_ids来从data_df中移除对应的评分记录
# filtered_data_df = rating_df.filter(~F.col("movie_id").isin(sampled_movie_ids))

模拟用户冷启动数据

In [None]:
# def select_five_users(data_df, num_users=5):
#     # 获取唯一的movie_id列表
#     unique_user_ids = data_df.select("user_id").distinct().collect()
#     # 将DataFrame的行转换为Python列表
#     unique_user_ids_list = [row["user_id"] for row in unique_user_ids]
#     # 随机选择num_movies个不同的电影ID
#     sampled_user_ids = random.sample(unique_user_ids_list, num_users)

#     # 对每个选中的电影ID进行分组并计数
#     user_counts = data_df.groupBy("user_id").count()
#     # 过滤出选中的电影ID的计数
#     sampled_user_counts = user_counts.filter(
#         F.col("user_id").isin(sampled_user_ids)).collect()

#     # 打印每个选中的电影的评分记录总数
#     for row in sampled_user_counts:
#         print(f"User ID {row['user_id']} has {row['count']} ratings.")

#     # 返回选定的movie_ids
#     return sampled_user_ids


# # 调用函数并打印结果
# sampled_user_ids = select_five_users(filtered_data_df)
# print("Selected user IDs:", sampled_user_ids)

# # 根据随机选择的movie_id创建备份DataFrame
# cold_users_df = filtered_data_df.filter(F.col("user_id").isin(sampled_user_ids))

# # 可以选择将备份DataFrame写入磁盘或进行其他处理
# cold_users_df.write.mode('overwrite').parquet(
#     './users_cold_data.parquet')

# # 现在你可以使用sampled_movie_ids来从data_df中移除对应的评分记录
# filtered_data_df = filtered_data_df.filter(~F.col("user_id").isin(sampled_user_ids))
# # 现在你可以使用sampled_user_ids来从ratings_df中移除评分记录
# filtered_data_df = filtered_data_df.filter(
#     ~filtered_data_df["user_id"].isin(sampled_user_ids))
# filtered_data_df.write.mode('overwrite').parquet('./filtered_data.parquet')

划分数据集

生成负样本用于模型训练

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window


# 初始化 SparkSession
spark = SparkSession.builder.appName("NegativeSampling").getOrCreate()


In [None]:
user_df = spark.read.parquet("./user_df.parquet")
movie_df = spark.read.parquet("./movie_df.parquet")
# 加载数据集
rating_df = spark.read.parquet("./rating_df.parquet")

In [None]:
print(user_df.count(), movie_df.count(), rating_df.count())
print(rating_df.select('user_id').distinct().count(),\
      rating_df.select('movie_id').distinct().count())

In [None]:
rating_df.show(5)

In [None]:
# 生成负样本的函数
def generate_negative_samples(rating_df, df, data_user_positive_movies, sampling_ratio=2):

    user_df = rating_df.select("user_id", "gender", "ageVector", "occupationVector", 'user_avg_ratingVector').distinct()
    movie_df = rating_df.select("movie_id", "genresVector", 'movie_avg_ratingVector').distinct()
    # 获取用户和电影的所有组合
    user_all_movies = rating_df.select("user_id", "movie_id").distinct()
    all_movies = rating_df.select("movie_id").distinct()
    user_all_movies = user_all_movies.select("user_id").distinct().crossJoin(all_movies)
    user_positive_movies = df.select("user_id", "movie_id").distinct()

    # Step 3: 计算每个电影的点击频率
    movie_click_freq = rating_df.groupBy("movie_id").count().withColumnRenamed("count", "click_count").select('movie_id', 'click_count')
    # Step 4: 计算每个电影的权重（点击频率的0.75次幂）
    movie_click_freq = movie_click_freq.withColumn("sampling_weight", F.pow(F.col("click_count"), 0.75))

    # Step 5: 计算每个用户的正样本数量
    user_positive_count = df.groupBy("user_id").count().withColumnRenamed("count", "positive_count")

    # Step 6: 对每个用户生成负样本
    user_negative_movies = user_all_movies.join(data_user_positive_movies, on=["user_id", "movie_id"], how="left_anti")
    # Step 7: 合并负样本和电影点击频率，进行带权重的负采样
    user_negative_movies = user_negative_movies.join(movie_click_freq, "movie_id", "left")

    # 窗口是按user_id进行分区，随机排序后，按weight逆序排列
    window_spec = Window.partitionBy("user_id").orderBy(F.rand(42), F.col("sampling_weight").desc())
    user_negative_movies = user_negative_movies.withColumn("row_num", F.row_number().over(window_spec))
    
    # 按指定比例（默认2:1）采样负样本
    user_negative_movies = user_negative_movies.join(user_positive_count, "user_id", 'left')
    user_negative_movies = user_negative_movies.filter(F.col("row_num") <= sampling_ratio * F.col("positive_count"))  

    # 正样本标签为1
    user_positive_movies = user_positive_movies.withColumn("label", F.lit(1))
    # 负样本标签为-1
    user_negative_movies = user_negative_movies.withColumn("label", F.lit(0))  
    user_negative_movies = user_negative_movies.select("user_id", "movie_id", "label")

    user_movie = user_positive_movies.union(user_negative_movies)
    user_movie = user_movie.join(user_df, "user_id", "left")
    user_movie = user_movie.join(movie_df, "movie_id", "left")
    return user_movie

# 创建窗口规范，按user_id分区，按timestamp降序排序
window_spec = Window.partitionBy("user_id").orderBy(F.desc("timestamp"))
# 添加行号
data_spark_df = rating_df.withColumn("row_num", F.row_number().over(window_spec))


# 创建训练、验证和测试集
train_df = data_spark_df.filter(F.col("row_num") > 10)
val_df = data_spark_df.filter((F.col("row_num") > 5) & (F.col("row_num") <= 10))
test_df = data_spark_df.filter(F.col("row_num") <= 5)


# 提取训练、验证和测试集中的所有正样本的电影ID
train_positive_movies = train_df.select('user_id', "movie_id").distinct()
val_positive_movies = val_df.select('user_id', "movie_id").distinct()
test_positive_movies = test_df.select('user_id', "movie_id").distinct()
val_positive_movies = train_positive_movies.union(val_positive_movies)
test_positive_movies = val_positive_movies.union(test_positive_movies)

# # 为训练集生成负样本
train_samples = generate_negative_samples(rating_df, train_df, train_positive_movies)

# # 为验证集和测试集生成负样本
val_samples = generate_negative_samples(rating_df, val_df, val_positive_movies)
test_samples = generate_negative_samples(rating_df, test_df,  test_positive_movies)


In [None]:
print(train_samples.count(), val_samples.count(), test_samples.count())

In [None]:
train_samples.write.mode('overwrite').parquet('./train_df.parquet')
val_samples.write.mode('overwrite').parquet('./val_df.parquet')
test_samples.write.mode('overwrite').parquet('./test_df.parquet')

In [None]:
print(train_df.select('user_id').distinct().count(),\
      train_df.select('movie_id').distinct().count(),\
      val_df.select('user_id').distinct().count(),\
      val_df.select('movie_id').distinct().count(),\
       val_df.count(), user_df.count(), movie_df.count())

In [None]:
print(train_samples.count(), val_samples.count(), test_samples.count())

In [None]:
train_samples.show(5)
val_samples.show(5)
test_samples.show(5)

验证集电影更少是因为选择timestamp后每个用户5-10次的电影作为验证集，覆盖比较有限

In [None]:
spark.stop()

In [None]:
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("TowTowerModel").config(
    "spark.driver.memory", "8g").getOrCreate()

# 加载数据集
train_df = spark.read.parquet("./train_df.parquet")
val_df = spark.read.parquet("./val_df.parquet")
user_df = spark.read.parquet("./user_df.parquet")
movie_df = spark.read.parquet("./movie_df.parquet")

In [None]:
import torch
import os

def save_large_data_to_multiple_files(rdd, batch_size, output_dir, base_filename):
    rdd = rdd.zipWithIndex().cache()
    num_rows = rdd.count()
    num_batches = (num_rows + batch_size - 1) // batch_size

    # 创建保存文件的目录
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 分批保存数据
    for i in range(num_batches):
        start = i * batch_size
        end = min(start + batch_size, num_rows)

        data_batch = rdd.filter(lambda x: start <= x[1] < end).map(lambda x: x[0]).collect()

        user_features_batch = []
        movie_features_batch = []
        labels_batch = []

        for row in data_batch:
            user_id = torch.tensor(row['user_id']).unsqueeze(0).unsqueeze(1)
            gender = torch.tensor(row['gender']).unsqueeze(0).unsqueeze(1)
            age_vector = torch.tensor(row['ageVector']).unsqueeze(0)
            occupation_vector = torch.tensor(row['occupationVector']).unsqueeze(0)
            user_avg_rating_vector = torch.tensor(row['user_avg_ratingVector']).unsqueeze(0)
            movie_id = torch.tensor(row['movie_id']).unsqueeze(0).unsqueeze(1)
            genres_vector = torch.tensor(row['genresVector']).unsqueeze(0)
            movie_avg_rating_vector = torch.tensor(row['movie_avg_ratingVector']).unsqueeze(0)
            label = torch.tensor(row['label']).unsqueeze(0)

            user_features = torch.cat([user_id, gender, age_vector, occupation_vector, user_avg_rating_vector], dim=1)
            movie_features = torch.cat([movie_id, genres_vector, movie_avg_rating_vector], dim=1)

            user_features_batch.append(user_features)
            movie_features_batch.append(movie_features)
            labels_batch.append(label)

        user_features_batch = torch.stack(user_features_batch)
        movie_features_batch = torch.stack(movie_features_batch)
        labels_batch = torch.stack(labels_batch)

        # 保存到 .pt 文件
        filename = f"{base_filename}_batch_{i+1}.pt"
        filepath = os.path.join(output_dir, filename)
        torch.save((user_features_batch, movie_features_batch, labels_batch), filepath)
        print(f"Saved batch {i+1}/{num_batches} to {filepath}")

# 将 DataFrame 转换为 RDD
train_rdd = train_df.rdd
val_rdd = val_df.rdd

# 参数设置
output_dir = './new_two_tower_data'
batch_size = 100000
base_filename = 'train_data'

# 分批保存 train_rdd
save_large_data_to_multiple_files(train_rdd, batch_size, output_dir, base_filename)
base_filename = 'val_data'
save_large_data_to_multiple_files(val_rdd, batch_size, output_dir, base_filename)
