In [1]:
import sys
import time
import csv
import math
import numpy as np
import pyspark
from pyspark import SparkContext, SparkConf
import json
from operator import add
import pandas as pd
import xgboost as xgb
from sklearn import preprocessing
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
from node2vec import Node2Vec as n2v
import networkx as nx
from hashlib import md5
import pickle
import os

pd.set_option('display.max_rows', None)

folder_path = "../data/"
test_filepath = "../yelp_true.csv"
output_filepath = ""

from utils import create_category_md5_mapping, integrate_mapping_user_bus_cat_data, dataframe_to_rdd_dict, analyze_top_business_categories, analyze_top_categories

In [2]:
def initialize_spark_context(APP_NAME="Train: XGBModel"):
    # Spark配置项列表
    SPARK_CONF = [
        ("spark.dynamicAllocation.enabled", "true"),  # 启用动态资源分配
        ("spark.dynamicAllocation.maxExecutors", "10"),  # 最大执行器数量
        ("spark.executor.memory", "3g"),  # 每个执行器的内存
        ("spark.executor.cores", "6"),  # 每个执行器的CPU核心数
        ("spark.executor.memoryOverhead", "3000"),  # 执行器内存开销
        ("spark.driver.memory", "4g"),  # 驱动程序的内存
        ("spark.driver.maxResultSize", "2g"),  # 驱动程序的最大结果大小
        ("spark.python.worker.memory", "2g"),  # Python工作进程的内存
        ("spark.sql.shuffle.partitions", "25"),  # Shuffle操作的分区数
        ("spark.sql.sources.partitionOverWriteMode", "dynamic"),  # 分区覆写模式
        ("spark.network.timeout", "600s"),  # 网络超时设置
        ("spark.executor.heartbeatInterval", "120s"),  # 执行器心跳间隔
    ]

    # 创建Spark配置
    spark_conf = pyspark.SparkConf()
    spark_conf.setAppName(APP_NAME)
    spark_conf.setAll(SPARK_CONF)

    # 创建SparkContext
    sc = pyspark.SparkContext(conf=spark_conf)
    sc.setLogLevel("ERROR")  # 设置日志级别

    return sc

sc = initialize_spark_context()

24/04/19 13:02:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 32968)
Traceback (most recent call last):
  File "/usr/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/lib/p

# Initialize

In [3]:
# test
from better_features import FeatureProcessor, read_json_data, transform_user_data, transform_business_data,extract_review_data,extract_business_data
from KMeans_user_cluster import KMeans_process_user_clusters

# folder_path = '../data/'
# user_parsed_df = pd.read_csv('../well-trained/cache/user_df.csv')
# business_parsed_df = pd.read_csv('../well-trained/cache/business_df.csv')
# review_data = read_json_data(folder_path + '/review_train.json', extract_review_data, sc).collect()
# feature_processor = FeatureProcessor(user_parsed_df, business_parsed_df, review_data)

# from utils import integrate_mapping_user_bus_cat_data
# # category: category md5 mapping
# category_md5_df = create_category_md5_mapping(feature_processor.business_df)

# # historical reviews 
# final_mapped_review_df = integrate_mapping_user_bus_cat_data(feature_processor.df_conn, category_md5_df, review_data)

                                                                                

# Begin

### Process a User-Biz Matrix using tip.json and train ALS

In [3]:
from pyspark import SparkContext, SparkConf
from collections import defaultdict

# 读取数据
# testing
rdd = sc.textFile('../data/tip.json')

# 解析JSON数据，每一行是一个字符串化的JSON对象
def parse_json(line):
    try:
        data = json.loads(line)
        return ((data['user_id'], data['business_id']), 1)
    except json.JSONDecodeError:
        return None

In [4]:
# 将数据解析成((user_id, business_id), 1)的元组
interaction_counts = rdd.map(parse_json).filter(lambda x: x is not None)

# 对所有交互进行计数, 后续使用；
user_business_interaction_counts = interaction_counts.reduceByKey(lambda a, b: a + b)

# 现在我们有了((user_id, business_id), count)的RDD
# 转换为(user_id, (business_id, count))
user_business_pairs = user_business_interaction_counts.map(lambda x: (x[0][0], (x[0][1], x[1])))

# 为了构建交互矩阵，我们将对每个用户的所有交互进行分组
user_grouped_interactions = user_business_pairs.groupByKey()

# 最终，我们将为每个用户生成一个包含所有商户ID和交互次数的列表
user_interaction_lists = user_grouped_interactions.mapValues(list)

In [5]:
from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import MatrixFactorizationModel

# id转换为数值, id: string->int
user_ids = user_business_interaction_counts.keys().map(lambda x: x[0]).distinct().zipWithUniqueId().collectAsMap()
business_ids = user_business_interaction_counts.keys().map(lambda x: x[1]).distinct().zipWithUniqueId().collectAsMap()
user_ids_bcast = sc.broadcast(user_ids)
business_ids_bcast = sc.broadcast(business_ids)

def to_rating(line):
    user_id, business_id = line[0]
    count = line[1]
    return (user_ids_bcast.value[user_id], business_ids_bcast.value[business_id], count)

ratings = user_business_interaction_counts.map(to_rating)

                                                                                

In [6]:
# # if necessary (optional)
# num_partitions = 200  # 例如，根据您的集群大小可以调整此值
# partitioned_interactions = user_grouped_interactions.partitionBy(num_partitions)

In [None]:
# ratings.take(20)

In [7]:
# user_interaction_lists.take(1)

                                                                                

[('ulQ8Nyj7jCUR8M83SUMoRQ',
  [('KNpcPGqDORDdvtekXd348w', 3),
   ('Tl_jT2a0bRMac5_YW65GPg', 2),
   ('c5NHHW0sNm7eaQBwvQJkkw', 1),
   ('ESzO3Av0b1_TzKOiqzbQYQ', 1),
   ('dAa0hB2yrnHzVmsCkN4YvQ', 1),
   ('k7WRPbDd7rztjHcGGkEjlw', 2),
   ('zJwm3DThV4WUIkyNv68Ahw', 1),
   ('ofXtrtsaKzPpVXFa8fHgiw', 1),
   ('SqW3igh1_Png336VIb5DUA', 1),
   ('8qNOI6Q1-rJrvWWD5Btz6w', 2),
   ('A73Dp0lo9s_Ci9NFDqbhqw', 1),
   ('FCUVjQf762no86Uzcbv1Tg', 1),
   ('--ujyvoQlwVoBgMYtADiLA', 1)])]

In [None]:
# ALS fine-tuned
rank=10
RegParam=0.5
alpha=1520.0

model = ALS.trainImplicit(ratings, rank, alpha=alpha, lambda_=reg_param)

### use trained ALS to decomposed U, V vectors, but we need to fine-tune params first

In [None]:
# use trained ALS to decomposed U, V vectors
user_features_rdd = model.userFeatures()
product_features_rdd = model.productFeatures()
# .collectAsMap()

In [None]:
## matrix size=120000 , not that big, super fast
start_time = time.time()

user_features = user_features_rdd.collectAsMap()
product_features = product_features_rdd.collectAsMap()

print(time.time() - start_time)

In [None]:
# dot product of U*V
def affinity_score(user_id, business_id):
    user_feature = user_features[user_ids_bcast.value[user_id]]
    business_feature = product_features[business_ids_bcast.value[business_id]]
    return sum(x * y for x, y in zip(user_feature, business_feature))

In [None]:
# example_user_id = 'ulQ8Nyj7jCUR8M83SUMoRQ'
# example_business_id = 'Y9LN295xg9dQGBgbtu9Oaw'
# score = affinity_score(example_user_id, example_business_id)
# print("Affinity score for user {} and business {}: {}".format(example_user_id, example_business_id, score))

In [None]:
product_features[121030][5]

In [None]:
# product_features

In [None]:
# TODO

# Fine-Tune ALS: GridSearch to Find Best HyperParameters for ALS model

In [7]:
from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext
from math import sqrt
import itertools

In [8]:
user_to_num_dict = user_ids_bcast.value
business_to_num_dict = business_ids_bcast.value
user_to_num_bc = sc.broadcast(user_to_num_dict)
business_to_num_bc = sc.broadcast(business_to_num_dict)

## LOCAL GRID SEARCH

In [26]:
# Create Train, Val Set: ((user_id, business_id), count) format
training_rdd, validations_rdd = user_business_interaction_counts.randomSplit([0.8, 0.2], seed=42)

from pyspark.mllib.recommendation import ALS
from math import sqrt

# 创建Rating对象的RDD
ratings_rdd = user_business_interaction_counts.map(
    lambda p: Rating(user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]], float(p[1]))
)

# ALS模型参数网格搜索
ranks = [15, 20, 25]
reg_params = [0.01, 0.1, 1.0]
alphas = [0.01, 0.1, 1.0]
best_rmse = float('inf')
best_model = None
best_rank = 0
best_reg_param = 0
best_alpha = 0

for rank in ranks:
    for reg_param in reg_params:
        for alpha in alphas:
            for iteration in iterations:
                # 训练ALS模型
                model = ALS.trainImplicit(ratings_rdd, rank, seed=42, iterations=5, lambda_=reg_param, alpha=alpha)
                
                # 在验证集上评估模型
                # 我们需要将验证集的交互数据转换为(user_id_num, business_id_num)格式
                # 然后使用训练好的模型对其进行评分预测，并计算RMSE
                # 这里只是代码的框架，需要根据实际的验证集来实现评分预测和RMSE的计算
    
                # 假设validations_rdd是验证集的RDD，其格式为((user_id, business_id), count)
                validation_for_predict_rdd = validations_rdd.map(
                    lambda p: (user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]])
                )
                predictions = model.predictAll(validation_for_predict_rdd).map(
                    lambda r: ((r[0], r[1]), r[2])
                )
                rates_and_preds = validations_rdd.map(
                    lambda p: ((user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]]), float(p[1]))
                ).join(predictions)
                rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
                
                # 比较并保存最佳模型
                if rmse < best_rmse:
                    best_rmse = rmse
                    best_model = model
                    best_rank = rank
                    best_reg_param = reg_param
                    best_alpha = alpha

# 打印最佳模型的参数
print(f"Best RMSE: {best_rmse}")
print(f"Best Rank: {best_rank}")
print(f"Best Regularization Parameter: {best_reg_param}")
print(f"Best Alpha: {best_alpha}")

24/04/17 09:33:03 WARN BlockManager: Task 263082 already completed, not releasing lock for rdd_31_0
24/04/17 09:33:19 WARN BlockManager: Task 264159 already completed, not releasing lock for rdd_31_0
24/04/17 09:34:20 WARN BlockManager: Task 268350 already completed, not releasing lock for rdd_31_0
24/04/17 09:35:08 WARN BlockManager: Task 271084 already completed, not releasing lock for rdd_31_0

Best RMSE: 1.5809843015468776
Best Rank: 15
Best Regularization Parameter: 0.1
Best Alpha: 1.0


                                                                                

### BEST RMSE (Unprocessesd): 


### EVALUATE: Log_transform score, user_business_interaction_counts and Determine Performance Metric (Cosine Dist. MAE)

In [23]:
test_df = user_business_interaction_counts.collect()

                                                                                

In [28]:
unique_users = user_business_interaction_counts.map(lambda x: x[0][0]).distinct().count()
unique_businesses = user_business_interaction_counts.map(lambda x: x[0][1]).distinct().count()
print(f"Unique Users: {unique_users}")
print(f"Unique Businesses: {unique_businesses}")

interaction_stats = user_business_interaction_counts.map(lambda x: x[1]).stats()
print(f"Interaction Counts Stats: {interaction_stats}")

Total interactions: 999945
Unique Users: 304870
Unique Businesses: 121526
Interaction Counts Stats: (count: 999945, mean: 1.185413197725872, stdev: 1.2011436999553304, max: 284.0, min: 1.0)


In [29]:
# 使用histogram查看分布情况
interaction_histogram = user_business_interaction_counts.map(lambda x: x[1]).histogram(10)
for i, count in zip(interaction_histogram[0][:-1], interaction_histogram[1]):
    print(f"Interaction count range: {i} to {interaction_histogram[0][interaction_histogram[0].index(i)+1]}, Count: {count}")


Interaction count range: 1.0 to 29.3, Count: 999780
Interaction count range: 29.3 to 57.6, Count: 118
Interaction count range: 57.6 to 85.9, Count: 22
Interaction count range: 85.9 to 114.2, Count: 14
Interaction count range: 114.2 to 142.5, Count: 2
Interaction count range: 142.5 to 170.8, Count: 5
Interaction count range: 170.8 to 199.1, Count: 2
Interaction count range: 199.1 to 227.4, Count: 1
Interaction count range: 227.4 to 255.70000000000002, Count: 0
Interaction count range: 255.70000000000002 to 284, Count: 1


In [31]:
from math import log

# 计算对数转换需要的基础值
min_interaction = 1  # 已知最小值
max_interaction = 284  # 已知最大值
log_min = log(min_interaction + 1)
log_max = log(max_interaction + 1)

def normalize_interaction(interaction):
    """ 将原始互动次数转换为1到5的评分 """
    normalized_rating = 1 + 4 * (log(interaction + 1) - log_min) / (log_max - log_min)
    return normalized_rating

# 应用转换
normalized_ratings_rdd = user_business_interaction_counts.map(
    lambda p: Rating(user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]], normalize_interaction(p[1]))
)


In [71]:
normalized_ratings_rdd

PythonRDD[41] at RDD at PythonRDD.scala:53

In [30]:
# # 查找异常高的交互次数
# high_interactions = user_business_interaction_counts.filter(lambda x: x[1] > 10).collect()
# print(f"High Interactions: {high_interactions[:10]}")  # 打印前10个例子

High Interactions: [(('blrWvPePSv87aU9hV1Zd8Q', 'Bf-yMQiSmDZyqfGyFKQ0bA'), 14), (('wNMnTY2D3hMTZSzSaU4EMw', 'Pn1f4xiZoynF0nDb-vtY0A'), 17), (('UsmTxWbobLsI6WR1Db0W7A', 'ESzO3Av0b1_TzKOiqzbQYQ'), 20), (('UsmTxWbobLsI6WR1Db0W7A', 'X-b4-QvZLENnf3yFwhpSXQ'), 12), (('6tbXpUIU6upoeqWNDo9k_A', 'ZR_UyVncA3R3pQu79MGr_A'), 11), (('CbmNBkKa9QKNxPiN_whFUw', 'Z7UryKxT7drcxbDjPSn4hA'), 12), (('jlpapIPWURjBUg3V31AjKw', '1Z4_zSITNVQ_Bt027R0S1g'), 11), (('B5FTjNWSgkAQ9UjOKVjGhw', 'j37Z4LIXTH9j6KOq9aX8DQ'), 11), (('rh_p4FqrWJcWGjcUA-oj0A', 'qNSZpcyoF7OM6g49sAFUcQ'), 14), (('SlgpAnj2gQd44EM_Uq6DkQ', 'IGxxnH4Q2S2y_ZdFFW469w'), 34)]


### SPARK GRID SEARCH WORKING!!! Update: replace metric: (MAE and cosine)

In [20]:
# ratings_rdd = user_business_interaction_counts.map(
#     lambda p: Rating(user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]], float(p[1]))
# )
# ratings_rdd.collect()

                                                                                

[Rating(user=4, product=3, rating=1.0),
 Rating(user=0, product=5, rating=3.0),
 Rating(user=11, product=10, rating=1.0),
 Rating(user=11, product=6, rating=2.0),
 Rating(user=11, product=1, rating=1.0),
 Rating(user=11, product=2, rating=1.0),
 Rating(user=18, product=9, rating=1.0),
 Rating(user=18, product=8, rating=1.0),
 Rating(user=18, product=16, rating=1.0),
 Rating(user=6, product=12, rating=1.0),
 Rating(user=6, product=0, rating=1.0),
 Rating(user=6, product=15, rating=2.0),
 Rating(user=2, product=4, rating=2.0),
 Rating(user=3, product=17, rating=1.0),
 Rating(user=3, product=11, rating=1.0),
 Rating(user=3, product=22, rating=1.0),
 Rating(user=3, product=29, rating=1.0),
 Rating(user=3, product=5, rating=2.0),
 Rating(user=3, product=18, rating=2.0),
 Rating(user=3, product=19, rating=3.0),
 Rating(user=7, product=26, rating=1.0),
 Rating(user=10, product=36, rating=1.0),
 Rating(user=10, product=7, rating=1.0),
 Rating(user=13, product=25, rating=3.0),
 Rating(user=13, 

In [28]:
# ranks = list(range(15,20))  # 从5到20的范围
ranks = [15, 20, 25]
reg_params = [0.1, 0.5, 1.0]  # 更细粒度的调整
alphas = [0.01, 0.5, 1.5]  # 保持在0.5到1.5之间的范围

# 修改后的参数列表
params = [(rank, reg_param, alpha) for rank in ranks for reg_param in reg_params for alpha in alphas]

In [29]:
from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext
from math import sqrt

def train_model(train_data, val_data, params, user_dict, business_dict):
    rank, reg_param, alpha = params
    
    # 将映射字典转换为局部变量，以避免序列化全局环境
    user_to_num = sc.broadcast(user_dict)
    business_to_num = sc.broadcast(business_dict)

    def rating_mapper(p):
        user_id = user_to_num.value.get(p[0][0], -1)
        business_id = business_to_num.value.get(p[0][1], -1)
        return Rating(user_id, business_id, float(p[1]))

    # 使用局部的训练数据RDD
    local_ratings_rdd = train_data.map(rating_mapper)
    model = ALS.trainImplicit(local_ratings_rdd, rank, iterations=10, lambda_=reg_param, alpha=alpha)
    
    def validation_mapper(p):
        return (user_to_num.value.get(p[0][0], -1), business_to_num.value.get(p[0][1], -1))

    # 使用局部的验证数据RDD
    local_validation_rdd = val_data.map(validation_mapper)
    predictions = model.predictAll(local_validation_rdd).map(lambda r: ((r.user, r.product), r.rating))
    rates_and_preds = val_data.map(lambda p: ((user_to_num.value.get(p[0][0], -1), business_to_num.value.get(p[0][1], -1)), p[1])).join(predictions)
    rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

    return (rmse, rank, reg_param, alpha)

sc = SparkContext.getOrCreate()
full_rdd = user_business_interaction_counts.cache()
training_rdd, validation_rdd = full_rdd.randomSplit([0.8, 0.2], seed=42)

# 提前准备所有需要的数据和映射
user_dict = full_rdd.map(lambda p: p[0][0]).distinct().zipWithIndex().collectAsMap()
business_dict = full_rdd.map(lambda p: p[0][1]).distinct().zipWithIndex().collectAsMap()

results = [train_model(training_rdd, validation_rdd, param, user_dict, business_dict) for param in params]
best_params = min(results, key=lambda x: x[0])
print("Best parameters:", best_params)

24/04/17 09:41:25 WARN BlockManager: Task 274206 already completed, not releasing lock for rdd_31_0
24/04/17 09:41:58 WARN BlockManager: Task 276690 already completed, not releasing lock for rdd_31_0

Best parameters: (1.6605276164617149, 25, 0.1, 1.5)


                                                                                

### using normalized ratings rdd to further explore performace

In [45]:
from pyspark.mllib.recommendation import ALS
import numpy as np
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, Rating

def log_transform_interaction(interaction_count):
    """对交互次数应用对数转换并标准化到1到5的评分范围"""
    return 1 + 4 * (np.log(interaction_count + 1) - np.log(1 + 1)) / (np.log(284 + 1) - np.log(1 + 1))


In [46]:
# ranks = list(range(15,20))  # 从5到20的范围
ranks = [25]
reg_params = [0.01]  # 更细粒度的调整
alphas = [1.0]  # 保持在0.5到1.5之间的范围

# 修改后的参数列表
params = [(rank, reg_param, alpha) for rank in ranks for reg_param in reg_params for alpha in alphas]

In [47]:
start_time = time.time()

def train_model(train_data, val_data, params, user_dict, business_dict):
    rank, reg_param, alpha = params
    
    # 将映射字典转换为局部变量，以避免序列化全局环境
    user_to_num = sc.broadcast(user_dict)
    business_to_num = sc.broadcast(business_dict)

    def rating_mapper(p):
        user_id = user_to_num.value.get(p[0][0], -1)
        business_id = business_to_num.value.get(p[0][1], -1)
        log_transformed_rating = log_transform_interaction(float(p[1]))  # 应用对数转换
        return Rating(user_id, business_id, log_transformed_rating)


    # 使用局部的训练数据RDD
    local_ratings_rdd = train_data.map(rating_mapper)
    model = ALS.trainImplicit(local_ratings_rdd, rank, iterations=10, lambda_=reg_param, alpha=alpha)
    
    def validation_mapper(p):
        return (user_to_num.value.get(p[0][0], -1), business_to_num.value.get(p[0][1], -1))

    # 使用局部的验证数据RDD
    local_validation_rdd = val_data.map(validation_mapper)
    predictions = model.predictAll(local_validation_rdd).map(lambda r: ((r.user, r.product), r.rating))
    rates_and_preds = val_data.map(lambda p: ((user_to_num.value.get(p[0][0], -1), business_to_num.value.get(p[0][1], -1)), p[1])).join(predictions)
    rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

    return (rmse, rank, reg_param, alpha)

sc = SparkContext.getOrCreate()
full_rdd = user_business_interaction_counts.cache()
training_rdd, validation_rdd = full_rdd.randomSplit([0.8, 0.2], seed=42)

# 提前准备所有需要的数据和映射
user_dict = full_rdd.map(lambda p: p[0][0]).distinct().zipWithIndex().collectAsMap()
business_dict = full_rdd.map(lambda p: p[0][1]).distinct().zipWithIndex().collectAsMap()

results = [train_model(training_rdd, validation_rdd, param, user_dict, business_dict) for param in params]
best_params = min(results, key=lambda x: x[0])
print("Best parameters:", best_params)

print(f"Duration: {time.time() - start_time}")



Best parameters: (1.6724080106076653, 25, 0.01, 1.0)
Duration: 10.629076957702637


                                                                                

#### Parallized

In [48]:
'''
Parallelized!! No deterministic split
'''
start_time = time.time()

from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext
from math import sqrt
import numpy as np

def log_transform_interaction(interaction_count):
    """对交互次数应用对数转换并标准化到1到5的评分范围"""
    return 1 + 4 * (np.log(interaction_count + 1) - np.log(1 + 1)) / (np.log(284 + 1) - np.log(1 + 1))

def train_model(params, user_dict, business_dict, data_samples):
    sc = SparkContext.getOrCreate()
    full_rdd = sc.parallelize(data_samples)
    user_to_num_bc = sc.broadcast(user_dict)
    business_to_num_bc = sc.broadcast(business_dict)

    rank, reg_param, alpha = params
    train_data, val_data = full_rdd.randomSplit([0.8, 0.2], seed=42)
    
    def rating_mapper(p):
        user_id = user_to_num_bc.value.get(p[0][0], -1)
        business_id = business_to_num_bc.value.get(p[0][1], -1)
        log_transformed_rating = log_transform_interaction(float(p[1]))
        return Rating(user_id, business_id, log_transformed_rating)
        
    local_ratings_rdd = train_data.map(rating_mapper)
    model = ALS.trainImplicit(local_ratings_rdd, rank, iterations=5, lambda_=reg_param, alpha=alpha)

    def validation_mapper(p):
        return (user_to_num_bc.value.get(p[0][0], -1), business_to_num_bc.value.get(p[0][1], -1))
    
    local_validation_rdd = val_data.map(validation_mapper)
    predictions = model.predictAll(local_validation_rdd).map(lambda r: ((r.user, r.product), r.rating))
    rates_and_preds = val_data.map(lambda p: ((user_to_num_bc.value.get(p[0][0], -1), business_to_num_bc.value.get(p[0][1], -1)), p[1])).join(predictions)
    rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

    return (rmse, rank, reg_param, alpha)

sc = SparkContext.getOrCreate()
data_samples = user_business_interaction_counts.collect()

user_dict = sc.parallelize(data_samples).map(lambda p: p[0][0]).distinct().zipWithIndex().collectAsMap()
business_dict = sc.parallelize(data_samples).map(lambda p: p[0][1]).distinct().zipWithIndex().collectAsMap()

param_list = [(20, 0.02, 1.0), (25, 0.01, 1.0), (25, 0.05, 1.0), (25, 0.01, 0.5), (30, 0.01, 1.0)]
results = [train_model(params, user_dict, business_dict, data_samples) for params in param_list]
best_params = min(results, key=lambda x: x[0])
print("Best parameters:", best_params)

print(f"Duration: {time.time() - start_time}")



Best parameters: (1.857118000784612, 25, 0.05, 1.0)


                                                                                

In [12]:
'''
Parallelized!! deterministic split
'''
from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext
from math import sqrt
import numpy as np

def log_transform_interaction(rating):
    return np.log1p(rating)

# def log_transform_interaction(interaction_count):
#     """对交互次数应用对数转换并标准化到1到5的评分范围"""
#     return 1 + 4 * (np.log(interaction_count + 1) - np.log(1 + 1)) / (np.log(284 + 1) - np.log(1 + 1))

sc = SparkContext.getOrCreate()

data_samples = user_business_interaction_counts.collect()
full_rdd = sc.parallelize(data_samples)
train_data, val_data = full_rdd.randomSplit([0.8, 0.2], seed=42)

user_dict = full_rdd.map(lambda p: p[0][0]).distinct().zipWithIndex().collectAsMap()
business_dict = full_rdd.map(lambda p: p[0][1]).distinct().zipWithIndex().collectAsMap()
user_to_num_bc = sc.broadcast(user_dict)
business_to_num_bc = sc.broadcast(business_dict)

def train_model(params, training_rdd, validation_rdd, user_to_num_bcast, business_to_num_bcast):
    rank, reg_param, alpha = params
    
    user_to_num = user_to_num_bcast.value
    business_to_num = business_to_num_bcast.value

    ratings_rdd = training_rdd.map(lambda p: Rating(user_to_num[p[0][0]], business_to_num[p[0][1]], log_transform_interaction(p[1])))
    model = ALS.trainImplicit(ratings_rdd, rank, iterations=5, lambda_=reg_param, alpha=alpha)

    validation_for_predict_rdd = validation_rdd.map(lambda p: (user_to_num[p[0][0]], business_to_num[p[0][1]]))
    predictions = model.predictAll(validation_for_predict_rdd).map(lambda r: ((r.user, r.product), r.rating))
    rates_and_preds = validation_rdd.map(lambda p: ((user_to_num[p[0][0]], business_to_num[p[0][1]]), p[1])).join(predictions)
    rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

    return (rmse, rank, reg_param, alpha)

ranks = [10, 15, 20, 25, 30, 35, 40]
reg_params = [0.001, 0.005, 0.01, 0.02, 0.05, 0.1]
alphas = [0.2, 0.5, 1.0, 2.0, 3.0]

import itertools

# Generating all combinations of parameters
param_grid = list(itertools.product(ranks, reg_params, alphas))

results = [train_model(params, train_data, val_data, user_to_num_bc, business_to_num_bc) for params in param_grid]
best_params = min(results, key=lambda x: x[0])

print(f"Best RMSE: {best_params[0]}")
print(f"Best Rank: {best_params[1]}")
print(f"Best Regularization Parameter: {best_params[2]}")
print(f"Best Alpha: {best_params[3]}")


                                                                                

Best RMSE: 1.8468406523414755
Best Rank: 40
Best Regularization Parameter: 0.1
Best Alpha: 3.0


### BASIC STANDARD WAY TO DO GRID SEARCH w/ LogTrans

In [15]:
# Create Train, Val Set: ((user_id, business_id), count) format
training_rdd, validations_rdd = user_business_interaction_counts.randomSplit([0.8, 0.2], seed=42)

from pyspark.mllib.recommendation import ALS
from math import sqrt
import numpy as np
from pyspark.mllib.recommendation import Rating

def log_transform_interaction(interaction_count):
    """对交互次数应用对数转换并标准化到1到5的评分范围"""
    return 1 + 4 * (np.log(interaction_count + 1) - np.log(1 + 1)) / (np.log(284 + 1) - np.log(1 + 1))

# 创建经过对数转换的Rating对象的RDD
ratings_rdd = user_business_interaction_counts.map(
    lambda p: Rating(user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]], log_transform_interaction(p[1]))
)

# ALS模型参数网格搜索
# ranks = [20, 22, 25, 28, 30]
# reg_params = [0.005, 0.01, 0.02, 0.05]
# alphas = [0.5, 1.0, 2.0, 3.0]

# ranks = [10, 15, 20, 25, 30, 35, 40]
# reg_params = [0.001, 0.005, 0.01, 0.02, 0.05, 0.1]
# alphas = [0.2, 0.5, 1.0, 2.0, 3.0]

# Updated grid parameters for fine-tuning
# ranks = [35, 38, 40, 42, 45]
# reg_params = [0.0005, 0.001, 0.002]
# alphas = [2.5, 3.0, 3.5, 4.0]
ranks = [48, 50, 55]
reg_params = [0.00001, 0.0001, 0.0005]
alphas = [4.5, 5.0, 6.0]

best_rmse = float('inf')
best_model = None
best_rank = 0
best_reg_param = 0
best_alpha = 0

start_time = time.time()

for rank in ranks:
    for reg_param in reg_params:
        for alpha in alphas:
            # 训练ALS模型
            model = ALS.trainImplicit(ratings_rdd, rank, seed=42, iterations=5, lambda_=reg_param, alpha=alpha)
            
            # 在验证集上评估模型
            validation_for_predict_rdd = validations_rdd.map(
                lambda p: (user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]])
            )
            predictions = model.predictAll(validation_for_predict_rdd).map(
                lambda r: ((r[0], r[1]), r[2])
            )
            # 将实际评分和预测评分数据基于用户ID和商户ID进行对齐
            rates_and_preds = validations_rdd.map(
                lambda p: ((user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]]), log_transform_interaction(p[1]))
            ).join(predictions)
            # [1][0]是通过log_transform_interaction函数转换后的实际评分数据；[1][1]是ALS预测出的评分数据
            rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            
            # 比较并保存最佳模型
            if rmse < best_rmse:
                best_rmse = rmse
                best_model = model
                best_rank = rank
                best_reg_param = reg_param
                best_alpha = alpha

# 打印最佳模型的参数
print(f"Best RMSE: {best_rmse}")
print(f"Best Rank: {best_rank}")
print(f"Best Regularization Parameter: {best_reg_param}")
print(f"Best Alpha: {best_alpha}")

print("Duration: ", tiem.time() - start_time)



Best RMSE: 0.9748005892115309
Best Rank: 45
Best Regularization Parameter: 0.0005
Best Alpha: 4.0


                                                                                

## RMSE (Log.Trans) no CV

Best RMSE: 1.0262119633332438
Best Rank: 25
Best Regularization Parameter: 0.01
Best Alpha: 1.0

Best parameters: (1.8574931815882971, 25, 0.05, 1.0)

Best RMSE: 0.9936078540025709
Best Rank: 40
Best Regularization Parameter: 0.001
Best Alpha: 3.0

Best RMSE: 0.9748005892115309
Best Rank: 45
Best Regularization Parameter: 0.0005
Best Alpha: 4.0

In [16]:
''' basic + K-fold'''

from pyspark.mllib.recommendation import ALS
from math import sqrt
from pyspark.mllib.recommendation import Rating

# 创建经过对数转换的Rating对象的RDD
ratings_rdd = user_business_interaction_counts.map(
    lambda p: Rating(user_to_num_bc.value[p[0][0]], business_to_num_bc.value[p[0][1]], log_transform_interaction(p[1]))
)

# 参数设置
ranks = [45, 48, 50]
reg_params = [0.0001, 0.0005, 0.001]
alphas = [4.0, 4.5, 5.0]
numFolds = 5

# 初始化最佳设置记录
best_rmse = float('inf')
best_model = None
best_params = {}

# 创建K折数据集
weights = [1.0] * numFolds
seed = 42
folds = ratings_rdd.randomSplit(weights, seed)

for rank in ranks:
    for reg_param in reg_params:
        for alpha in alphas:
            fold_rmse = []
            for i in range(numFolds):
                train = sc.union([folds[j] for j in range(numFolds) if j != i])
                test = folds[i]
                
                # 训练ALS模型
                model = ALS.trainImplicit(train, rank, seed=seed, iterations=5, lambda_=reg_param, alpha=alpha)
                
                # 在测试集上评估模型
                test_for_predict_rdd = test.map(lambda p: (p.user, p.product))
                predictions = model.predictAll(test_for_predict_rdd).map(lambda r: ((r.user, r.product), r.rating))
                rates_and_preds = test.map(lambda p: ((p.user, p.product), p.rating)).join(predictions)
                rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
                fold_rmse.append(rmse)
            
            # 计算当前参数组合下的平均RMSE
            avg_rmse = sum(fold_rmse) / numFolds
            print(f"Tested Model: rank={rank}, regParam={reg_param}, alpha={alpha}, Average RMSE={avg_rmse}")
            
            # 比较并保存最佳模型
            if avg_rmse < best_rmse:
                best_rmse = avg_rmse
                best_model = model
                best_params = {'rank': rank, 'regParam': reg_param, 'alpha': alpha}

# 打印最佳模型参数
print(f"Best Model Parameters: {best_params}, with Average RMSE: {best_rmse}")

                                                                                

Tested Model: rank=45, regParam=0.0001, alpha=4.0, Average RMSE=1.020548586310234


                                                                                

Tested Model: rank=45, regParam=0.0001, alpha=4.5, Average RMSE=1.0170479077425632


                                                                                

Tested Model: rank=45, regParam=0.0001, alpha=5.0, Average RMSE=1.0137617029677723


                                                                                

Tested Model: rank=45, regParam=0.0005, alpha=4.0, Average RMSE=1.0234635850847786


                                                                                

Tested Model: rank=45, regParam=0.0005, alpha=4.5, Average RMSE=1.0197158244996927


                                                                                

Tested Model: rank=45, regParam=0.0005, alpha=5.0, Average RMSE=1.0162092081964083


                                                                                

Tested Model: rank=45, regParam=0.001, alpha=4.0, Average RMSE=1.025526182332733


                                                                                

Tested Model: rank=45, regParam=0.001, alpha=4.5, Average RMSE=1.0216714494862111


                                                                                

Tested Model: rank=45, regParam=0.001, alpha=5.0, Average RMSE=1.0180678816464532


                                                                                

Tested Model: rank=48, regParam=0.0001, alpha=4.0, Average RMSE=1.020309202161415


                                                                                

Tested Model: rank=48, regParam=0.0001, alpha=4.5, Average RMSE=1.0168311861671702


                                                                                

Tested Model: rank=48, regParam=0.0001, alpha=5.0, Average RMSE=1.0135636348153345


                                                                                

Tested Model: rank=48, regParam=0.0005, alpha=4.0, Average RMSE=1.0229594274337024


                                                                                

Tested Model: rank=48, regParam=0.0005, alpha=4.5, Average RMSE=1.0192347565823727


                                                                                

Tested Model: rank=48, regParam=0.0005, alpha=5.0, Average RMSE=1.0157566205956243


                                                                                

Tested Model: rank=48, regParam=0.001, alpha=4.0, Average RMSE=1.024923515166234


                                                                                

Tested Model: rank=48, regParam=0.001, alpha=4.5, Average RMSE=1.0210847314566984


                                                                                

Tested Model: rank=48, regParam=0.001, alpha=5.0, Average RMSE=1.0174948685778518




KeyboardInterrupt: 

                                                                                

## Pipelineing GirdSearch w/ logTrans and CV

In [48]:
'''use pyspark pipeline, ml API and sparkDF for simplicity BUT WHY SO SLOW???'''


from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer


# 创建ALS模型实例，设置implicitPrefs为True来指定使用隐式反馈
als = ALS(
    userCol="userIndex",
    itemCol="businessIndex",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,  # 确保非负参数，这通常对于推荐系统是有意义的
    implicitPrefs=True
)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[als])

paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [48, 50, 55]) \
    .addGrid(als.regParam, [0.00001, 0.0001, 0.0005]) \
    .addGrid(als.alpha, [4.5, 5.0, 6.0]) \
    .build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=2
)

# 训练模型
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
    .appName("Convert RDD to DataFrame") \
    .getOrCreate()

# 定义RDD的schema
schema = StructType([
    StructField("user", StringType(), True),
    StructField("business", StringType(), True),
    StructField("rating", IntegerType(), True)
])

adjusted_rdd = user_business_interaction_counts.map(
    lambda x: (x[0][0], x[0][1], x[1])
)
df = spark.createDataFrame(adjusted_rdd, schema)

# 使用StringIndexer转换字符串列为数值索引
indexer_user = StringIndexer(inputCol="user", outputCol="userIndex")
indexer_business = StringIndexer(inputCol="business", outputCol="businessIndex")

# Pipeline to apply transformations
tmp_pipeline = Pipeline(stages=[indexer_user, indexer_business])
df_transformed = tmp_pipeline.fit(df).transform(df).select("userIndex", "businessIndex", "rating")

model = crossval.fit(df_transformed)

# 输出最佳模型信息
bestModel = model.bestModel.stages[0]
print(f"Best Model Params: Rank={bestModel.getRank()}, RegParam={bestModel.getRegParam()}, Alpha={bestModel.getAlpha()}")
print(f"Best Model RMSE: {evaluator.evaluate(bestModel.transform(df))}")

[Stage 130353:>                                                   (0 + 10) / 10]

KeyboardInterrupt: 

[Stage 130421:>                                                   (0 + 10) / 10]

In [5]:
'''+ LOG TRANS use pyspark pipeline, ml API and sparkDF for simplicity '''

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

start_time = time.time()

# 创建ALS模型实例，设置implicitPrefs为True来指定使用隐式反馈
als = ALS(
    userCol="user",
    itemCol="business",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,  # 确保非负参数，这通常对于推荐系统是有意义的
    implicitPrefs=True
)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[als])

paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [51]) \
    .addGrid(als.regParam, [10, 30, 0.005]) \
    .addGrid(als.alpha, [1530.0]) \
    .build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# 训练模型
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import SparkSession

# 定义RDD的schema
schema = StructType([
    StructField("user", IntegerType(), True),
    StructField("product", IntegerType(), True),  # Changed from 'business' to 'product'
    StructField("rating", FloatType(), True)
])

from pyspark.mllib.recommendation import Rating
# def log_transform_interaction(interaction_count):
#     """对交互次数应用对数转换并标准化到1到5的评分范围"""
#     return 1 + 4 * (np.log(interaction_count + 1) - np.log(1 + 1)) / (np.log(284 + 1) - np.log(1 + 1))

# # 创建经过对数转换的Rating对象的RDD
# ratings_rdd = user_business_interaction_counts.map(
#     lambda p: Rating(int(user_to_num_bc.value[p[0][0]]), int(business_to_num_bc.value[p[0][1]]), float(np.float64(log_transform_interaction(p[1]))))
#     # lambda p: Rating(int(user_to_num_bc.value[p[0][0]]), int(business_to_num_bc.value[p[0][1]]), float((p[1])))
# )
from MF import test
ratings_rdd = test(sc, '../data/')

spark = SparkSession.builder.appName("ALS Example").getOrCreate()
df = spark.createDataFrame(ratings_rdd, schema)

df = df.withColumnRenamed("product", "business")
model = crossval.fit(df)

# 输出最佳模型信息
bestModel = model.bestModel.stages[0]
print(f"Best Model Params: Rank={bestModel.rank}, RegParam={bestModel._java_obj.parent().getRegParam()}, Alpha={bestModel._java_obj.parent().getAlpha()}")
print(f"Best Model RMSE: {evaluator.evaluate(bestModel.transform(df))}")

print("Duration: ", time.time() - start_time)
## 使用StringIndexer转换字符串列为数值索引
## indexer_user = StringIndexer(inputCol="user", outputCol="userIndex")
## indexer_business = StringIndexer(inputCol="business", outputCol="businessIndex")
## # Pipeline to apply transformations
## tmp_pipeline = Pipeline(stages=[indexer_user, indexer_business])
## df_transformed = tmp_pipeline.fit(df).transform(df).select("userIndex", "businessIndex", "rating")

                                                                                

Best Model Params: Rank=51, RegParam=30.0, Alpha=1530.0




Best Model RMSE: 0.38096950099939697
Duration:  425.4502580165863


                                                                                

In [19]:
df.show(10)

+----+--------+---------+
|user|business|   rating|
+----+--------+---------+
|   4|       3|      1.0|
|   0|       5|1.5590638|
|  11|      10|      1.0|
|  11|       6|1.3270314|
|  11|       1|      1.0|
|  11|       2|      1.0|
|  18|       9|      1.0|
|  18|       8|      1.0|
|  18|      16|      1.0|
|   6|      12|      1.0|
+----+--------+---------+
only showing top 10 rows



In [47]:
# df_transform = df_transformed.select("userIndex", "businessIndex", "rating")
# df_transform.show(5)

+---------+-------------+------+
|userIndex|businessIndex|rating|
+---------+-------------+------+
| 226485.0|     111362.0|     1|
|  11711.0|      43029.0|     3|
|   5681.0|      22845.0|     1|
|   5681.0|      26565.0|     2|
|   5681.0|       8654.0|     1|
+---------+-------------+------+
only showing top 5 rows



## RMSE Log.Trans w/ CV=2 
Best Model Params: Rank=50, RegParam=1e-05, Alpha=6.0
Best Model RMSE: 0.9522162059453022

Best Model Params: Rank=51, RegParam=1e-05, Alpha=12.0
Best Model RMSE: 0.9126936257722957
Duration:  1267.1926851272583

Best Model Params: Rank=51, RegParam=1e-05, Alpha=25.0
Best Model RMSE: 0.8586641084757474
Duration:  1679.2649910449982

Best Model Params: Rank=51, RegParam=5e-05, Alpha=45.0
Best Model RMSE: 0.8056559429426488
Duration:  814.2358434200287

Best Model Params: Rank=51, RegParam=5e-05, Alpha=60.0
Best Model RMSE: 0.7766625014795452
Duration:  537.503279209137

Best Model Params: Rank=51, RegParam=5e-05, Alpha=100.0
Best Model RMSE: 0.720838855207703
Duration:  530.7137999534607

Best Model Params: Rank=51, RegParam=5e-05, Alpha=180.0
Best Model RMSE: 0.6503744859799462
Duration:  364.15210580825806

Best Model Params: Rank=51, RegParam=5e-05, Alpha=300.0
Best Model RMSE: 0.5844215172277072
Duration:  335.40465235710144

Best Model Params: Rank=51, RegParam=5e-05, Alpha=600.0
Best Model RMSE: 0.49035982038807124
Duration:  324.7230396270752

Best Model Params: Rank=51, RegParam=5e-05, Alpha=1000.0
Best Model RMSE: 0.4207796793325426
Duration:  341.753643989563

Best Model Params: Rank=51, RegParam=5e-05, **Alpha=1500.0**
Best Model RMSE: 0.36787906916177776
Duration:  369.1352970600128

Best Model Params: Rank=51, RegParam=0.0005, Alpha=1510.0
Best Model RMSE: 0.3670443406380293
Duration:  2923.155244588852

**Best Model Params: Rank=51, RegParam=0.5, Alpha=1520.0**
*Best Model RMSE: 0.36660512851026805*
Duration:  675.6044607162476

**Best Model Params: Rank=51, RegParam=0.005, Alpha=1530.0**
*Best Model RMSE: 0.36539879987153356*
Duration:  316.333025932312

Best Model Params: Rank=51, RegParam=10.0, Alpha=1520.0
Best Model RMSE: 0.372141467955779
Duration:  151.50563216209412

Best Model Params: Rank=51, RegParam=30.0, Alpha=1530.0
Best Model RMSE: 0.38141422660997226
Duration:  250.79301953315735


# Completed: After we have the best model, then => U,V and calculate affinity score for each user, biz

In [3]:
from MF import matrix_factorization_als
start_time = time.time()
data_path = '../data/'  # Update with args[*]

pair_rdd, user_ids_bcast, business_ids_bcast= matrix_factorization_als(sc, data_path)
print("Duration: ", time.time() - start_time)

                                                                                

Duration:  9.486623525619507


In [5]:
pair_rdd.collect()

[('wf1GqnKQuvH-V3QN80UOOQ', 'fThrN4tfupIGetkrz18JOg'),
 ('39FT2Ui8KUXwmUt6hnwy-g', 'uW6UHfONAmm8QttPkbMewQ'),
 ('7weuSPSSqYLUFga6IYP4pg', 'IhNASEZ3XnBHmuuVnWdIwA'),
 ('CqaIzLiWaa-lMFYBAsYQxw', 'G859H6xfAmVLxbzQgipuoA'),
 ('yy7shAsNWRbGg-8Y67Dzag', 'rS39YnrhoXmPqHLzCBjeqw'),
 ('Uk1UKBIAwOqhjZdLm3r9zg', '5CJL_2-XwCGBmOav4mFdYg'),
 ('x-8ZMKKNycT3782Kqf9loA', 'jgtWfJCJZty_Nctqpdtp3g'),
 ('0FVcoJko1kfZCrJRfssfIA', 'JVK8szNDoy9MNiYSz_MiAA'),
 ('LcCRMIDz1JgshpPGYfLDcA', 't19vb_4ML2dg5HZ-MF3muA'),
 ('C__1BHWTGBNA5s2ZPH289g', 'h_UvnQfe1cuVICly_kIqHg'),
 ('CLbpPUqP6XpeAfoqScGaJQ', 'QjZFYd5hme7EHegpuJngMQ'),
 ('zDBOdWtl2PsNY38IeoE5cQ', 'gy-HBIeJGlQHs4RRYDLuHw'),
 ('w-_md1Qslb7r6098SHydlg', '9p-cpmHaga-EXyc6ZzYCcQ'),
 ('U4INQZOPSUaj8hMjLlZ3KA', '8tMy7yEeNVYylVVFRlPmdQ'),
 ('CMu9FmdK8xpiawJowJuGQg', '364hhL5st0LV16UcBHRJ3A'),
 ('s2o_JsABvrZVm_T03qrBUw', 'Gen0ei06dGq13OJ8DRXUjg'),
 ('DKolrsBSwMTpTJL22dqJRQ', 'RADloW1UROUn4FJFU4JJNg'),
 ('aOseJnydZYD8Og00vWylqg', 'fNc1WuGwiT7RhqXUIe4S8A'),
 ('I8_iXLc

In [4]:
def map_and_track_ids(line, user_ids_bcast, business_ids_bcast):
    user_id, biz_id = line
    user_num = user_ids_bcast.value.get(user_id.strip(), -1)
    biz_num = business_ids_bcast.value.get(biz_id.strip(), -1)
    return (user_num, biz_num), (user_id, biz_id)

In [5]:
test_mapped_rdd = pair_rdd.map(lambda line: map_and_track_ids(line, user_ids_bcast, business_ids_bcast))

In [7]:
tracked_pairs_rdd = pair_rdd.map(lambda line: map_and_track_ids(line, user_ids_bcast, business_ids_bcast))
valid_pairs_rdd = tracked_pairs_rdd.filter(lambda x: x[0][0] != -1 and x[0][1] != -1)
missing_pairs_rdd = tracked_pairs_rdd.filter(lambda x: x[0][0] == -1 or x[0][1] == -1).map(lambda x: (x[1], -99))

In [12]:
valid_pairs_rdd.collect()

[((7272, 24589), ('wf1GqnKQuvH-V3QN80UOOQ', 'fThrN4tfupIGetkrz18JOg')),
 ((218201, 8381), ('39FT2Ui8KUXwmUt6hnwy-g', 'uW6UHfONAmm8QttPkbMewQ')),
 ((41610, 18468), ('7weuSPSSqYLUFga6IYP4pg', 'IhNASEZ3XnBHmuuVnWdIwA')),
 ((4313, 10492), ('yy7shAsNWRbGg-8Y67Dzag', 'rS39YnrhoXmPqHLzCBjeqw')),
 ((46373, 12933), ('x-8ZMKKNycT3782Kqf9loA', 'jgtWfJCJZty_Nctqpdtp3g')),
 ((224889, 58564), ('0FVcoJko1kfZCrJRfssfIA', 'JVK8szNDoy9MNiYSz_MiAA')),
 ((133824, 71929), ('LcCRMIDz1JgshpPGYfLDcA', 't19vb_4ML2dg5HZ-MF3muA')),
 ((10431, 29433), ('C__1BHWTGBNA5s2ZPH289g', 'h_UvnQfe1cuVICly_kIqHg')),
 ((8389, 7796), ('CLbpPUqP6XpeAfoqScGaJQ', 'QjZFYd5hme7EHegpuJngMQ')),
 ((88945, 27564), ('zDBOdWtl2PsNY38IeoE5cQ', 'gy-HBIeJGlQHs4RRYDLuHw')),
 ((47439, 6486), ('w-_md1Qslb7r6098SHydlg', '9p-cpmHaga-EXyc6ZzYCcQ')),
 ((16148, 50020), ('U4INQZOPSUaj8hMjLlZ3KA', '8tMy7yEeNVYylVVFRlPmdQ')),
 ((99472, 2743), ('CMu9FmdK8xpiawJowJuGQg', '364hhL5st0LV16UcBHRJ3A')),
 ((44414, 8746), ('s2o_JsABvrZVm_T03qrBUw', 'Gen0ei06dG

In [9]:
missing_pairs_rdd.collect()

[(('CqaIzLiWaa-lMFYBAsYQxw', 'G859H6xfAmVLxbzQgipuoA'), -99),
 (('Uk1UKBIAwOqhjZdLm3r9zg', '5CJL_2-XwCGBmOav4mFdYg'), -99),
 (('I8_iXLcpYHAb_xi2vShgOg', 'LR0qF0FEVsCOhYWUOiH26A'), -99),
 (('ay4M5J28kBUf0odOQct0BA', 'b41zjWT4pzcTgNY04eKAvg'), -99),
 (('UmTMCfPlhA6kJLAsLycSfg', 'f5O7v_X_jCg2itqacRfxhg'), -99),
 (('PACS2nRQfwS7fYHNBL4ozg', 'h58IV-LbmpycdzlwVhUhgw'), -99),
 (('swqSj4zuamI_HFfPTj63Hg', 'n7V4cD-KqqE3OXk0irJTyA'), -99),
 (('qd16czwFUVHICKF7A4qWsQ', 't6WY1IrohUecqNjd9bG42Q'), -99),
 (('QyPcC88XQlXruvh5UBmkYw', 'gG9z6zr_49LocyCTvSFg0w'), -99),
 (('lFFIpfkox55FQ-wZD6BBVw', 'vw81ARj4ihGzcmanHw6dUg'), -99),
 (('tL2pS5UOmN6aAOi3Z-qFGg', 'qsJkjQoM3Hdg6Gtqp2S9_w'), -99),
 (('j6Kv_wLQzZMWobLIyijX3A', 'VZbKJBbcxCOvuZyuEOojsg'), -99),
 (('3UY0P5vAPTavFNTqqE2nVw', 'azsyGov8GFJswGxcqIn71g'), -99),
 (('GKy2zJOYcT89OqT_Xg9B2A', 'E-wETrdq1uImd6lX6x8Qxw'), -99),
 (('8c3XuWaEQwzbV9AtiXB8mw', 'S4NHg4GZD35HzzibJAxXDw'), -99),
 (('mswloq-IjRr9yXLhBqBPAw', '8xTuXU5uZ85WF-ZGqDNv_w'), -99),
 (('XOgq

In [13]:
valid_num_id_pairs = valid_pairs_rdd.map(lambda x: x[0])

In [14]:
valid_num_id_pairs.collect()

[(7272, 24589),
 (218201, 8381),
 (41610, 18468),
 (4313, 10492),
 (46373, 12933),
 (224889, 58564),
 (133824, 71929),
 (10431, 29433),
 (8389, 7796),
 (88945, 27564),
 (47439, 6486),
 (16148, 50020),
 (99472, 2743),
 (44414, 8746),
 (135680, 6666),
 (136873, 94087),
 (37904, 18487),
 (41516, 27252),
 (4262, 6221),
 (722, 268),
 (36203, 2265),
 (35678, 18468),
 (36155, 8267),
 (197932, 22672),
 (37433, 7550),
 (122696, 10038),
 (112307, 37066),
 (71185, 35957),
 (135694, 98489),
 (12550, 9402),
 (3600, 2634),
 (34869, 7338),
 (1142, 593),
 (40101, 6800),
 (136957, 82808),
 (23305, 8608),
 (1813, 1015),
 (38550, 4275),
 (133772, 90402),
 (148592, 21242),
 (146620, 23219),
 (71122, 52870),
 (53248, 7114),
 (286325, 65504),
 (193710, 95005),
 (138438, 101649),
 (136048, 58933),
 (133713, 317),
 (111271, 34690),
 (152926, 64234),
 (252830, 8936),
 (136555, 83563),
 (135918, 57689),
 (123763, 28776),
 (44991, 70954),
 (4277, 15306),
 (1696, 3260),
 (10536, 14124),
 (10137, 4454),
 (77306, 5

In [15]:
batch_size = 1000
valid_batches = valid_num_id_pairs.mapPartitions(lambda x: [list(x)]).flatMap(lambda x: [x[i:i + batch_size] for i in range(0, len(x), batch_size)])

In [16]:
valid_batches.collect()

[[(7272, 24589),
  (218201, 8381),
  (41610, 18468),
  (4313, 10492),
  (46373, 12933),
  (224889, 58564),
  (133824, 71929),
  (10431, 29433),
  (8389, 7796),
  (88945, 27564),
  (47439, 6486),
  (16148, 50020),
  (99472, 2743),
  (44414, 8746),
  (135680, 6666),
  (136873, 94087),
  (37904, 18487),
  (41516, 27252),
  (4262, 6221),
  (722, 268),
  (36203, 2265),
  (35678, 18468),
  (36155, 8267),
  (197932, 22672),
  (37433, 7550),
  (122696, 10038),
  (112307, 37066),
  (71185, 35957),
  (135694, 98489),
  (12550, 9402),
  (3600, 2634),
  (34869, 7338),
  (1142, 593),
  (40101, 6800),
  (136957, 82808),
  (23305, 8608),
  (1813, 1015),
  (38550, 4275),
  (133772, 90402),
  (148592, 21242),
  (146620, 23219),
  (71122, 52870),
  (53248, 7114),
  (286325, 65504),
  (193710, 95005),
  (138438, 101649),
  (136048, 58933),
  (133713, 317),
  (111271, 34690),
  (152926, 64234),
  (252830, 8936),
  (136555, 83563),
  (135918, 57689),
  (123763, 28776),
  (44991, 70954),
  (4277, 15306),
  

# Completed: Get affinity score, *final_scores_rdd*,  for each (user, business) in the yelp_train/val.csv
Note: but since we use rdd to get affinity score and dealt w/ cold-start,

we may have to re-order rows to align w/ what's in the original csv file at the end if grading script is stupid

In [3]:
from MF import matrix_factorization_als
start_time = time.time()
data_path = '../data/'  # Update with args[*]

# mapped_scores_rdd, mapped_missing_scores_rdd = matrix_factorization_als(sc, data_path)
final_scores_rdd, user_features_rdd, biz_features_rdd, valid_pairs_rdd, missing_pairs_rdd, reverse_user_ids_bcast, reverse_business_ids_bcast = matrix_factorization_als(sc, data_path)
print("Duration: ", time.time() - start_time)

                                                                                

Duration:  10.706738471984863


In [4]:
final_scores_rdd.collect()

                                                                                

[(('wf1GqnKQuvH-V3QN80UOOQ', 'fThrN4tfupIGetkrz18JOg'), 0.9736873615973883),
 (('39FT2Ui8KUXwmUt6hnwy-g', 'uW6UHfONAmm8QttPkbMewQ'), 0.5345032519363266),
 (('7weuSPSSqYLUFga6IYP4pg', 'IhNASEZ3XnBHmuuVnWdIwA'), 0.8560708887072375),
 (('yy7shAsNWRbGg-8Y67Dzag', 'rS39YnrhoXmPqHLzCBjeqw'), 0.6506799144097046),
 (('x-8ZMKKNycT3782Kqf9loA', 'jgtWfJCJZty_Nctqpdtp3g'), 0.7169183511538717),
 (('0FVcoJko1kfZCrJRfssfIA', 'JVK8szNDoy9MNiYSz_MiAA'), 0.12573703355741928),
 (('LcCRMIDz1JgshpPGYfLDcA', 't19vb_4ML2dg5HZ-MF3muA'), 0.7571762414592769),
 (('C__1BHWTGBNA5s2ZPH289g', 'h_UvnQfe1cuVICly_kIqHg'), 0.46482178458073703),
 (('CLbpPUqP6XpeAfoqScGaJQ', 'QjZFYd5hme7EHegpuJngMQ'), 0.8252747700417111),
 (('zDBOdWtl2PsNY38IeoE5cQ', 'gy-HBIeJGlQHs4RRYDLuHw'), 0.3326967582855405),
 (('w-_md1Qslb7r6098SHydlg', '9p-cpmHaga-EXyc6ZzYCcQ'), 0.759679596906847),
 (('U4INQZOPSUaj8hMjLlZ3KA', '8tMy7yEeNVYylVVFRlPmdQ'), 0.5487539801194623),
 (('CMu9FmdK8xpiawJowJuGQg', '364hhL5st0LV16UcBHRJ3A'), 0.19066509900241052

In [4]:
count = final_scores_rdd.count()
print("Length of final_scores_rdd:", count)



Length of final_scores_rdd: 142044


                                                                                

In [5]:
# 定义一个函数来检查元组中是否存在"Unknown"字符串
def check_unknown(tuple):
    for item in tuple:
        if "Unknown" in item:
            return True
    return False

# 使用filter操作来过滤包含"Unknown"字符串的元组
unknown_tuples = final_scores_rdd.filter(lambda x: check_unknown(x[0]))

# 检查是否有包含"Unknown"字符串的元组
if unknown_tuples.isEmpty():
    print("No tuples contain 'Unknown' strings.")
else:
    print("Tuples containing 'Unknown' strings:", unknown_tuples.collect())

                                                                                

No tuples contain 'Unknown' strings.


                                                                                

## process necessary user, bus vectors embedding from the original, also deal w/ missing (cold start)

In [19]:
# Extract user and business IDs from valid pairs
valid_user_ids = valid_pairs_rdd.map(lambda x: x[0][0]).distinct()
valid_business_ids = valid_pairs_rdd.map(lambda x: x[0][1]).distinct()

# Broadcast these IDs
valid_user_ids_bcast = sc.broadcast(valid_user_ids.collect())
valid_business_ids_bcast = sc.broadcast(valid_business_ids.collect())

# Filter feature RDDs using the valid IDs
valid_user_features_rdd = user_features_rdd.filter(lambda x: x[0] in valid_user_ids_bcast.value)
valid_biz_features_rdd = biz_features_rdd.filter(lambda x: x[0] in valid_business_ids_bcast.value)

default_vector = [0.0] * len(user_features_rdd.first()[1])  # Assuming all vectors have the same length, and assume 0.0 for now
default_vector_bcast = sc.broadcast(default_vector)

                                                                                

In [29]:
# Separate handling for missing user and business features
missing_user_features_rdd = missing_pairs_rdd.map(lambda x: (x[0][0], default_vector_bcast.value))
missing_biz_features_rdd = missing_pairs_rdd.map(lambda x: (x[0][1], default_vector_bcast.value))

In [36]:
# # Final merging and handling
# final_user_features_rdd = valid_user_features_rdd.union(missing_user_features_rdd)
# final_biz_features_rdd = valid_biz_features_rdd.union(missing_biz_features_rdd)

In [37]:
def map_num_to_string_id(features_tuple, reverse_id_bcast):
    """
    Maps a tuple of (numerical_id, features) to (string_id, features) using a broadcasted reverse ID map.
    """
    num_id, features = features_tuple
    str_id = reverse_id_bcast.value.get(num_id, "Unknown")  # Use "Unknown" if no mapping exists
    return (str_id, features)

# Map numerical IDs in valid_user_features_rdd to string IDs
valid_user_features_with_str_ids_rdd = valid_user_features_rdd.map(
    lambda x: map_num_to_string_id(x, reverse_user_ids_bcast))

valid_biz_features_with_str_ids_rdd = valid_biz_features_rdd.map(
    lambda x: map_num_to_string_id(x, reverse_business_ids_bcast))

# Now both RDDs use string IDs, merge them
final_user_features_rdd = valid_user_features_with_str_ids_rdd.union(missing_user_features_rdd)
final_biz_features_rdd = valid_biz_features_with_str_ids_rdd.union(missing_biz_features_rdd)

# ENCAPSULATED

In [3]:
from MF import matrix_factorization_als
start_time = time.time()
data_path = '../data/'  # Update with args[*]

# mapped_scores_rdd, mapped_missing_scores_rdd = matrix_factorization_als(sc, data_path)
final_scores_rdd, final_user_features_rdd, final_biz_features_rdd = matrix_factorization_als(sc, data_path)
print("Duration: ", time.time() - start_time)

                                                                                

Duration:  11.22281265258789


In [5]:
final_scores_rdd.take(5), final_user_features_rdd.take(1)

                                                                                

([(('wf1GqnKQuvH-V3QN80UOOQ', 'fThrN4tfupIGetkrz18JOg'), 0.8052944470981547),
  (('39FT2Ui8KUXwmUt6hnwy-g', 'uW6UHfONAmm8QttPkbMewQ'), 0.701897309016126),
  (('7weuSPSSqYLUFga6IYP4pg', 'IhNASEZ3XnBHmuuVnWdIwA'), 1.0344100346922414),
  (('yy7shAsNWRbGg-8Y67Dzag', 'rS39YnrhoXmPqHLzCBjeqw'), 0.8211280792885481),
  (('x-8ZMKKNycT3782Kqf9loA', 'jgtWfJCJZty_Nctqpdtp3g'), 0.7778656058837244)],
 [('ulQ8Nyj7jCUR8M83SUMoRQ',
   array('d', [-0.341767817735672, -0.6615524291992188, 1.9749647378921509, -1.9038962125778198, -2.1017773151397705, 1.3063570261001587, 1.71941077709198, -2.796144962310791, -1.369841456413269, 0.5570476055145264]))])

In [7]:
final_biz_features_rdd.take(1)

                                                                                

[('azE1DNVQFBU8boVbaJhj7w',
  array('d', [-0.028799572959542274, -0.006738672964274883, 0.05829034745693207, 0.08992240577936172, -0.026271861046552658, 0.10994374006986618, 0.08646126836538315, -0.09953965991735458, -0.05454787611961365, 0.040256477892398834]))]

### SANITY ANALYSIS

In [19]:
# final_user_features_with_strings_rdd.filter(lambda x: x[1] == default_vector).collect()

In [8]:
default_vector = [0.0] * 10  # Example default vector

def check_for_defaults_and_unknowns(features_rdd, default_vector):
    """
    This function checks for rows with default vectors and "Unknown" IDs.
    It returns the count of rows with default vectors and rows with "Unknown" IDs.
    """
    # Count rows with default vectors
    default_count = features_rdd.filter(lambda x: x[1] == default_vector).count()
    
    # Count rows with "Unknown" IDs
    unknown_count = features_rdd.filter(lambda x: x[0] == "Unknown").count()
    
    return default_count, unknown_count

# Perform the checks for user and business feature RDDs
user_defaults, user_unknowns = check_for_defaults_and_unknowns(final_user_features_rdd, default_vector)
biz_defaults, biz_unknowns = check_for_defaults_and_unknowns(final_biz_features_rdd, default_vector)

                                                                                

In [39]:
# Print the results
print(f"User Defaults: {user_defaults}, User Unknowns: {user_unknowns}")
print(f"Business Defaults: {biz_defaults}, Business Unknowns: {biz_unknowns}")

User Defaults: 26044, User Unknowns: 0
Business Defaults: 26044, Business Unknowns: 0


# CONCLUSION:

- 我们非常高效地利用RDD，通过ALS得到tip.json中的用户和商家的特征向量，用分块矩阵乘法(block matrix multiplication)来计算实际需要考虑的用户和商户之间的亲和力得分。这个亲和力得分可以后续作为特征，让XGBRegressor学习。

- 后续，如果直接使用用户和商家的特征向量作为输入特征，您将引入更多的维度到模型中。这可能会增强模型的预测能力，因为这些特征向量编码了关于用户和商户的重要信息，但同时也可能增加过拟合的风险，特别是在特征维数很高时。
  
- 仅使用亲和力得分: 亲和力得分本身是一个从特征向量中派生的特征，反映了用户和商户之间的潜在关系。仅使用这一个得分可以减少模型的复杂性，降低过拟合的风险，并可能加快训练速度。


## 理论依据
- 特征向量中可能包含亲和力得分未能完全捕捉的信息。如果您的场景下，用户的个性化需求和商户的独特属性对预测任务尤为关键，那么提供完整的特征向量可能会有助于模型捕捉到这些复杂的关系。

- 信息覆盖: 特征向量中可能包含亲和力得分未能完全捕捉的信息。如果在现在的场景下，用户的个性化需求和商户的独特属性对预测任务尤为关键，那么提供完整的特征向量可能会有助于模型捕捉到这些复杂的关系。

In [5]:
# # numpy.core._exceptions._ArrayMemoryError: Unable to allocate 23.0 GiB for an array with shape (25406, 121526) and data type float64
# print("Affinity scores example:", affinity_scores_rdd.take(5))

In [92]:
# print("Affinity scores example:", affinity_scores_rdd.take(5))
# print("User ID mapping example:", list(user_ids_bcast.value.items())[:5])
# print("Business ID mapping example:", list(business_ids_bcast.value.items())[:5])

Affinity scores example: [((0, 0), -2.214342565985518e-05), ((0, 12), 0.2920612750297914), ((0, 24), 0.20821277066745858), ((0, 36), 0.7875571581378863), ((0, 48), 0.71932584997959)]
User ID mapping example: [('ulQ8Nyj7jCUR8M83SUMoRQ', 0), ('0l1I3fu22Aec8S5VG6NiqQ', 7), ('rGuevqz7WCka0f4UfkHurQ', 14), ('Xe-Dlatnkz9A9l8oyDgU9Q', 21), ('okOV6LqE3bTy5ZIMVm3VOQ', 28)]
Business ID mapping example: [('tJRDll5yqpZwehenzE2cSg', 0), ('k7WRPbDd7rztjHcGGkEjlw', 7), ('8qNOI6Q1-rJrvWWD5Btz6w', 14), ('c5NHHW0sNm7eaQBwvQJkkw', 21), ('JzB7NITHQ7gVHGVZ1ntgIQ', 28)]


# Archived: pre- sores_rdd Processing and Analysis

In [96]:
from pyspark import SparkContext
import numpy as np

# 计算商户总数
total_business_count = len(business_ids_bcast.value)

# 统计每个用户的得分记录数量
user_scores_count = affinity_scores_rdd.map(lambda x: (x[0][0], 1)).reduceByKey(lambda a, b: a + b)

# 检查每个用户的得分记录是否等于商户总数
consistent_users = user_scores_count.filter(lambda x: x[1] == total_business_count)

# 查看不一致的用户（即得分记录数不等于商户总数的用户）
inconsistent_users = user_scores_count.filter(lambda x: x[1] != total_business_count)

# 打印结果
print("Total business count: ", total_business_count)
print("Consistent users (example):", consistent_users.take(5))
print("Inconsistent users (example):", inconsistent_users.take(5))

Total business count:  121526


                                                                                

Consistent users (example): [(0, 121526), (1, 121526), (2, 121526), (3, 121526), (4, 121526)]
Inconsistent users (example): []


In [12]:
print("Affinity scores example:", affinity_scores_rdd.take(10))

Affinity scores example: [((0, 0), -2.8818499995775303e-05), ((0, 12), 0.2931362566499758), ((0, 24), 0.004529009503575848), ((0, 36), 0.7364946269746667), ((0, 48), 0.9833278893913743), ((0, 60), 0.13164626736620358), ((0, 72), 0.48885588272566216), ((0, 84), 0.12957128602783802), ((0, 96), 0.12957128602783802), ((0, 108), 0.3366717157502644)]


In [16]:
# Example: Calculate basic statistics for affinity scores
stats = affinity_scores_rdd.map(lambda x: x[1]).stats()
print("Mean score:", stats.mean())
print("Standard deviation:", stats.stdev())
print("Max score:", stats.max())
print("Min score:", stats.min())

[Stage 690:>                                                      (0 + 12) / 12]

Mean score: 0.013045328180013544
Standard deviation: 0.13186665950782575
Max score: 5.212019810222868
Min score: -3.6185841196981885


                                                                                

In [18]:
# Example: Recommend Top 3 affnitive businesses for each user
top3_recommendations = affinity_scores_rdd.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey().mapValues(lambda x: sorted(list(x), key=lambda y: y[1], reverse=True)[:3])
print("Top 3 recommendations for 3 users (example):", top3_recommendations.take(3))

                                                                                

Top 3 recommendations for each user (example): [(0, [(4283, 2.243898320028813), (21471, 2.0370684428248973), (9960, 2.0364196777220305)]), (1, [(4778, 7.99976580004422e-05), (3693, 6.048461886035935e-05), (17157, 5.739607357349828e-05)]), (2, [(3654, 0.24115612690854257), (9960, 0.2343048873974668), (26571, 0.21922514224579626)])]


In [97]:
val_df = pd.read_csv('../yelp_val.csv')
affinity_dict = affinity_scores_rdd.collectAsMap()

                                                                                

In [105]:
user_to_num_dict = user_ids_bcast.value
biz_to_num_dict = business_ids_bcast.value

num_to_user_dict = {v: k for k, v in user_to_num_dict.items()}
num_to_biz_dict = {v: k for k, v in biz_to_num_dict.items()}

In [111]:
num_to_user_dict

{0: 'ulQ8Nyj7jCUR8M83SUMoRQ',
 7: '0l1I3fu22Aec8S5VG6NiqQ',
 14: 'rGuevqz7WCka0f4UfkHurQ',
 21: 'Xe-Dlatnkz9A9l8oyDgU9Q',
 28: 'okOV6LqE3bTy5ZIMVm3VOQ',
 35: 'Xc2afntnM3flsP-pPZxXPg',
 42: 'rp007n5XVLGJwIOTh9U0-g',
 49: 'YrDbyRFyan6nx5aXb_N8Hg',
 56: 'rr_r5cE5Mz44TIPjOHk1Qg',
 63: 'nx_htQFv1Ck4iCCyfJl7_g',
 70: 'PimD6QJQpDk_pn0zR1GwZQ',
 77: 'BH4c0UxaFy4j7tER57qi6Q',
 84: '3FjdHwnoz-vfw6hBBtckqg',
 91: 'L1VaWJjSZELhqyaMoDU5tw',
 98: 'wFL_ALur5nfnG12QXAgSEQ',
 105: 'ChvMiLRa2T4gaI0k5nbTyw',
 112: '8ijJRp3DjaaaT7TfB3pFxA',
 119: 'zlTw3kIe7UcFk831WBRIXw',
 126: 'dpZguGGV9C4zC0KgEIfaTQ',
 133: 'mNy3YPlyaOsD6FDUtU7mxA',
 140: '4yG4J05aKzE2zov0Jr37kg',
 147: '4Qu99ylIiwdNUu10P65inA',
 154: 'AM3pzipo_ssKE4FWNkALuA',
 161: 'yUmEBt_NayUxGhKBnH094Q',
 168: 'JiPMk9WmbJu-VfTRAKpZpw',
 175: '0l9V2VrhdWDgNR-7TpSAdg',
 182: 'H5d_nFqzwrREE-YduK2ABg',
 189: 'B0N2Ynjr9yHMMlgSVDGOfg',
 196: 'n3opKJcy8lYSxR5E1ppWmA',
 203: '4GPJCaHHAzPgmT1QGIVHSw',
 210: 'KV92qxvltVlZ_vHDUqO3sg',
 217: 'wTI-obUmpg8MGqTzVp

In [110]:
num_to_biz_dict

{0: 'tJRDll5yqpZwehenzE2cSg',
 7: 'k7WRPbDd7rztjHcGGkEjlw',
 14: '8qNOI6Q1-rJrvWWD5Btz6w',
 21: 'c5NHHW0sNm7eaQBwvQJkkw',
 28: 'JzB7NITHQ7gVHGVZ1ntgIQ',
 35: '3pQ4KSnOX9Y2IEMJvGFxUg',
 42: '-VruCC0mPOyZXW1MfQLkQA',
 49: 'N_IytPmdrTc9yn3tWPFtQg',
 56: 'QFPS9JBNdOOHVP0C3pSslw',
 63: 'GAmd3nHx5TInVN26PLZj7g',
 70: '4FFhq0e4bxAG4uXqgT14jQ',
 77: 'EbxaQO95C7PUNTmokE70XA',
 84: 'eDusRHhrFW_Y555LjQrlzw',
 91: 'T3syhQWse9WtFMoWk-oN6g',
 98: 'I-D4qEtJJ4okSLrLKxMMRw',
 105: 'Tzp6kdtA7coJgnY3-hFHtw',
 112: '_R1jBQQieKpNGMBqmrLRyA',
 119: 'DBtmJXpwS31E0cDZo7ypGw',
 126: 'gh6__q2WXFuyN8gt6VAnWw',
 133: 'PrNdvlxp2E-PKqQTy6IGfA',
 140: 'uSVSllAbQ71Aenmdy0r3Rw',
 147: 'w4T8CLarUWVjjRuOK9wKpw',
 154: 'JxSmlL_MckX0dvS5XBg0Cg',
 161: 'QqpzPFFHeHqyWSPZI-DkvQ',
 168: 'CS7DuVN2YkWDU7aoeFj-og',
 175: 'nlVjdQq9FzdQ3bfy-8y80g',
 182: 'PJcOOjebn86geLXG2qPB_Q',
 189: '2eVpkjDioFSgp2C27mSdHQ',
 196: 'sHB4PVLgugU9TGfOdBvgqw',
 203: 'VNOosvcE6HN_dqSTqRicXQ',
 210: 'sJQszX3VVOpkQAwfIcjqzQ',
 217: 'WnuRPyTEvILZKCAPk2

In [123]:
affinity_dict[14, 56]

KeyError: (14, 56)

In [106]:
def get_affinity_score(row):
    user_index = user_to_num_dict.get(row['user_id'], None)
    biz_index = biz_to_num_dict.get(row['business_id'], None)
    if user_index is not None and biz_index is not None:
        return affinity_dict.get((user_index, biz_index), None)
    return None

In [107]:
val_df['affinity_score'] = val_df.apply(get_affinity_score, axis=1)

In [108]:
val_df.head()

Unnamed: 0,user_id,business_id,stars,affinity_score
0,wf1GqnKQuvH-V3QN80UOOQ,fThrN4tfupIGetkrz18JOg,5.0,
1,39FT2Ui8KUXwmUt6hnwy-g,uW6UHfONAmm8QttPkbMewQ,5.0,
2,7weuSPSSqYLUFga6IYP4pg,IhNASEZ3XnBHmuuVnWdIwA,4.0,
3,CqaIzLiWaa-lMFYBAsYQxw,G859H6xfAmVLxbzQgipuoA,5.0,
4,yy7shAsNWRbGg-8Y67Dzag,rS39YnrhoXmPqHLzCBjeqw,3.0,


In [98]:
affinity_dict[(0,60)]

0.18571836330326622

## Archived

In [None]:
# unknown_user_entries = processed_scores_rdd.filter(lambda x: x[0][0] == "Unknown User").collect()
# count_unknown_user = len(unknown_user_entries)

# # 查找包含"Unknown Business"的条目
# unknown_business_entries = processed_scores_rdd.filter(lambda x: x[0][1] == "Unknown Business").collect()
# count_unknown_business = len(unknown_business_entries)

# # 打印结果
# print("Number of entries with 'Unknown User':", count_unknown_user)
# print("Number of entries with 'Unknown Business':", count_unknown_business)

In [None]:
# def calculate_affinity_scores(sc, user_features, business_features_bcast, user_ids_bcast, business_ids_bcast):
#     """
#     计算每个用户与所有商户之间的亲和力得分，并使用广播变量映射用户和商户ID, Avoid CrossJoin。

#     :param user_features: 用户特征向量的字典
#     :param business_features_bcast: 商户特征向量的广播变量
#     :param user_ids_bcast: 用户ID映射表的广播变量
#     :param business_ids_bcast: 商户ID映射表的广播变量
#     :return: RDD，包含((用户ID字符串, 商户ID字符串), 亲和力得分)
#     """

#     if business_features_bcast.value is None:
#         raise Exception("Broadcast variable not loaded correctly")

#     # 转换用户特征字典为RDD
#     users_rdd = sc.parallelize(user_features.items(), numSlices=100).cache()
    
#     # 计算亲和力得分
#     def calculate_scores(user_item):
#         user_id, user_vec = user_item
#         results = []
#         user_id_str = user_ids_bcast.value.get(user_id, "Unknown User")
#         for biz_id, biz_vec in business_features_bcast.value.items():
#             biz_id_str = business_ids_bcast.value.get(biz_id, "Unknown Business")
#             score = np.dot(user_vec, biz_vec)
#             results.append(((user_id_str, biz_id_str), score))
#         return results

#     # 对每个用户进行计算
#     scores_rdd = users_rdd.flatMap(calculate_scores)
    
#     return scores_rdd

# # Usage: 
# # user_features, business_features_bcast, user_ids_bcast, business_ids_bcast = matrix_factorization_als(data_path)
# # processed_scores_rdd = calculate_affinity_scores(sc, user_features, business_features_bcast, user_ids_bcast, business_ids_bcast)

In [None]:
# # 计算亲和力分数
# user_features = best_model.userFeatures()
# item_features = best_model.productFeatures()

# # 将特征转换为字典形式，方便查找
# user_feature_dict = {k: v for k, v in user_features}
# item_feature_dict = {k: v for k, v in item_features}

# # 计算点积
# def affinity_score(user, item):
#     """计算用户和商户之间的亲和力分数"""
#     user_vector = np.array(user_feature_dict[user])
#     item_vector = np.array(item_feature_dict[item])
#     return np.dot(user_vector, item_vector)

# # 示例：计算验证集中某个用户-商户对的亲和力分数
# example_user_item_pair = (example_user_id, example_item_id)
# score = affinity_score(*example_user_item_pair)
# print(f"Affinity Score for user {example_user_id} and item {example_item_id}: {score}")


# FAILURE ANALYSIS
代码出现问题主要是因为在使用Apache Spark进行分布式计算时，涉及到函数的序列化和网络传输。特别是当在Spark的转换函数（如map、filter等）中使用外部变量或数据结构（如RDD）时，如果这些外部引用不能被有效序列化，就会导致运行错误。

- 原因分析
  
> 非法引用外部RDD：在您的初始代码中，函数train_model尝试直接使用外部定义的RDD（training_rdd和validation_rdd），这在Spark中是不允许的。因为Spark的设计不允许在一个节点上的执行函数中直接引用另一个节点上的数据结构，这样的操作会引起序列化问题，因为RDD本身并不支持序列化。

- 广播变量的错误使用：尽管广播变量用来解决类似问题很有效，但在您的初始尝试中，可能存在对这些广播变量使用不当的情况，导致了序列化错误。

- PicklingError：这是Python的一个序列化错误，常在尝试将函数（尤其是包含不可序列化对象引用的函数）进行序列化时出现。在您的代码中，尝试将含有RDD操作的函数进行分布式传输时遇到了这个问题。

为了解决这些问题，我进行了以下几个关键的修改：

1. 函数封装和参数化：

> 重新封装了train_model函数，使其不直接引用任何外部RDD。相反，通过函数参数将必要的数据（例如映射字典）传入，确保了函数的独立性和可序列化性。
这样做不仅避免了序列化问题，也使得函数更加通用和可重用。

2. 使用广播变量传递小数据集：

> 将用户和商家的ID映射转换为广播变量，这样每个节点可以独立访问这些数据而无需通过网络传输。
广播变量是Spark专门为了高效分布式处理而设计的一种数据结构，可以避免数据的重复传输，提升计算效率。

3. 本地化RDD操作：

> 在train_model中，创建局部的RDD处理逻辑，确保所有RDD操作都在函数内部定义和执行，不依赖外部的RDD状态。
这个改动有助于提升函数的封闭性和网络传输的安全性，因为所有的数据流都在预定义的操作范围内完成


通过上述修改，代码现在可以在分布式环境中安全有效地运行，同时避免了常见的序列化和RDD操作错误。

Note:考虑使用更高级的调优工具：如使用Apache Spark的MLlib中的CrossValidator或TrainValidationSplit来进行更系统的模型评估和参数调优。

In [None]:
# from pyspark.mllib.recommendation import ALS, Rating
# from pyspark import SparkContext
# from math import sqrt
# import itertools

# user_to_num_dict = user_ids_bcast.value
# business_to_num_dict = business_ids_bcast.value
# user_to_num_bc = sc.broadcast(user_to_num_dict)
# business_to_num_bc = sc.broadcast(business_to_num_dict)

# # Create Train, Val Set: ((user_id, business_id), count) format
# training_rdd, validation_rdd = user_business_interaction_counts.randomSplit([0.8, 0.2], seed=42)

# # 改为从全部数据集构建映射，而不只是训练集
# full_rdd = user_business_interaction_counts
# user_to_num = full_rdd.map(lambda p: p[0][0]).distinct().zipWithIndex().collectAsMap()
# business_to_num = full_rdd.map(lambda p: p[0][1]).distinct().zipWithIndex().collectAsMap()

# # 广播映射字典
# user_to_num_bc = sc.broadcast(user_to_num)
# business_to_num_bc = sc.broadcast(business_to_num)

# # 准备验证集的RDD，增加错误处理
# validation_for_predict_rdd = validation_rdd.map(
#     lambda p: (
#         user_to_num_bc.value.get(p[0][0], -1),  # 使用get方法，如果ID不存在返回-1
#         business_to_num_bc.value.get(p[0][1], -1)
#     )
# )

# try:
#     collected_data = validation_for_predict_rdd.collect()
#     print("Data collected successfully:", collected_data[:5])  # 打印前5条数据
# except Exception as e:
#     print("Error in collecting data:", e)

# validation_for_predict_rdd.filter(lambda x: x[1] == -1).count()

# def train_model(params):
#     rank, reg_param, alpha = params
#     # Explicitly extract broadcast variables to avoid any hidden serialization issues.
#     user_to_num = user_to_num_bc.value
#     business_to_num = business_to_num_bc.value
    
#     # Construct the ratings and validation RDDs within the function to avoid any serialization issues
#     def rating_mapper(p):
#         user_id = user_to_num.get(p[0][0], -1)
#         business_id = business_to_num.get(p[0][1], -1)
#         return Rating(user_id, business_id, float(p[1]))
    
#     def validation_mapper(p):
#         return (user_to_num.get(p[0][0], -1), business_to_num.get(p[0][1], -1))

#     local_ratings_rdd = training_rdd.map(rating_mapper)
#     local_validation_for_predict_rdd = validation_rdd.map(validation_mapper)

#     # Train model using the locally created RDDs
#     model = ALS.trainImplicit(local_ratings_rdd, rank, iterations=10, lambda_=reg_param, alpha=alpha)

#     # Predict and calculate RMSE
#     predictions = model.predictAll(local_validation_for_predict_rdd).map(lambda r: ((r.user, r.product), r.rating))
#     rates_and_preds = validation_rdd.map(
#         lambda p: ((user_to_num.get(p[0][0], -1), business_to_num.get(p[0][1], -1)), p[1])
#     ).join(predictions)
#     rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

#     return (rmse, rank, reg_param, alpha)

# # 并行执行网格搜索
# params_rdd = sc.parallelize([(5, 0.01, 1.0), (10, 0.1, 0.1)])  # Example parameter set
# results = params_rdd.map(train_model).collect()
# best_params = min(results, key=lambda x: x[0])

# Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

# During handling of the above exception, another exception occurred:

# PicklingError                             Traceback (most recent call last)
# Cell In[32], line 5
#       2 sc = SparkContext.getOrCreate()
#       4 params_rdd = sc.parallelize([(5, 0.01, 1.0), (10, 0.1, 0.1)])  # Example parameter set
# ----> 5 results = params_rdd.map(train_model).collect()
#       7 best_params = min(results, key=lambda x: x[0])