In [1]:
from pyspark import SparkConf, SparkContext
import csv
from collections import Counter, defaultdict
from itertools import islice
import numpy as np
import json

In [2]:
# sc.stop()
conf = SparkConf().setMaster("local[*]").setAppName("App_Name").set('spark.executor.memory', '4G') \
    .set('spark.driver.memory', '45G').set('spark.driver.maxResultSize', '10G')
sc = SparkContext(conf=conf)

### Read Input file

In [7]:
score_dict = defaultdict(lambda: defaultdict()) # dict[user][product] = score
users = set() # len = 256059
products = set() # len = 74258
with open('Demo_data.csv', newline='') as csvfile: # 資料長度：568454 筆
    rows = csv.reader(csvfile)
    for i, row in islice(enumerate(rows), 1, None):
#         print(row) # 'Id', 'ProductId', 'UserId', 'ProfileName', 'HelpfulnessNumerator', 'HelpfulnessDenominator', 'Score', 'Time', 'Summary', 'Text'
        user_id = row[2]
        product_id = row[1]
        score = int(row[6])
        score_dict[product_id][user_id] = score
        users.add(user_id)
        products.add(product_id)

### 建立 item-item collaborative filtering
#### Step1: 建立 matrix
建立一個 matrix，存 user 對 product 的 score，格式：[(product, [(user, score), (user, score), ...])]

#### Step2: 計算相似度
三組 map-reduce</br>
第一組：把每個 product score 做標準化（原本的值 - mean）</br>
第二組：利用第一組的結果，計算兩兩 product 的 dot product 並加總 -> 存到 dot_value_dict[(product1, product2)] = value</br>
第三組：利用第一組的結果，計算每個 product 的平方和 -> 存到 score_sqr_dict[product] = value

計算相似度 -> similarity(product_1, product_2)</br>

#### Step3: 預測分數
由其他人的推薦結果，預測該 (user, product) 的 pair 預測分數是否超過 0.4 分，超過的留下，沒超過的刪除</br>

## Step 1: 建立 matrix

In [8]:
def build_matrix(score_dict):
    matrix = []
    for product in score_dict:
        scores = [(user, score) for user, score in score_dict[product].items()]
        matrix.append((product, scores))
    return matrix

score_matrix = build_matrix(score_dict)

In [9]:
# score_matrix

## Step2: 計算相似度

#### 將每個 product 分數做標準化

In [10]:
def mapper_product_standardization(line): # [('product_1', [('user_1', 1), ...]
    maplist = []
    scores = [score for (user, score) in line[1]]
    avg = np.mean(scores)
    for user, score in line[1]:
        maplist.append((user, [(line[0], score-avg)]))
    return maplist

In [11]:
lines = sc.parallelize(score_matrix).flatMap(mapper_product_standardization)
lines_r = lines.reduceByKey(lambda x,y: x+y)
# lines_r.collect() # [(user_id, [ (product_id, score), (product_id, score), ... ])]

#### 計算 product 之間的 dot product
方法：</br>
前一步把同一個 column(user) 的資訊 group 在一起了，所以就把每個 product 兩兩配成一對把 score 相乘，在 reduce 時再把 dot 乘積相加起來

In [12]:
def mapper_dot_product(line): # [('user_1', [('product_1', -2.6), ('product_3', -1.0), ('product_6', -1.6)]), ...]
    maplist = []
    for i, (product_1, score_1) in enumerate(line[1]):
        for product_2, score_2 in islice(line[1], i+1, None):
            maplist.append(((product_1, product_2), score_1*score_2))
    return maplist

把 product pair 的 dot product value 存進 dictionary 裡面

In [13]:
dot_value_dict = defaultdict() # dict[(product_id, product_id)] = score
lines_dot = lines_r.flatMap(mapper_dot_product) # sc.parallelize(w)
lines_dot_r = lines_dot.reduceByKey(lambda x,y: x+y)
for line in lines_dot_r.collect(): # product score 平方和
    if line[1] > 0: # 避免資料量過大，直接先把 dot value = 0 的過濾掉（代表相似度 < 0）
        dot_value_dict[line[0]] = line[1]

#### 計算 product 的 score 平方和

In [14]:
def mapper_score_square(line): # [('user_1', [('product_1', -2.6), ('product_3', -1.0), ('product_6', -1.6)]), ...]
    maplist = []
    for product, score in line[1]:
        maplist.append((product, score**2))
    return maplist

In [15]:
lines_sqr = lines_r.flatMap(mapper_score_square)
lines_sqr_r = lines_sqr.reduceByKey(lambda x,y: x+y)
# lines_sqr_r.collect() # product score 平方和

把 product 的 score 平方和存進 dictionary 裡面

In [16]:
score_sqr_dict = defaultdict()
for line in lines_sqr_r.collect():
    score_sqr_dict[line[0]] = line[1]

#### 計算兩兩 product 的 相似度
sum(dot乘積) / sqrt(product x 的平方和) * sqrt(product y 的平方和)

In [17]:
def similarity(product_1, product_2):
    if (product_1, product_2) in dot_value_dict:
        return dot_value_dict[(product_1, product_2)]/((score_sqr_dict[product_1]**0.5)*(score_sqr_dict[product_2]**0.5))
    elif (product_2, product_1) in dot_value_dict:
        return dot_value_dict[(product_2, product_1)]/((score_sqr_dict[product_1]**0.5)*(score_sqr_dict[product_2]**0.5))
    else: # 兩個 product 完全沒有交集的狀態 -> dot value = 0 -> sim = 0
        return 0

In [18]:
product_sim = defaultdict(Counter)
for i, product_1 in enumerate(score_matrix): # [('product_1', [('user_1', 1), ...]
    for product_2 in islice(score_matrix, i+1, None):
#         print(product_1[0], product_2[0], similarity(product_1[0], product_2[0]))
        if similarity(product_1[0], product_2[0]) > 0: # 只儲存相似度 > 0 的狀況（小於 0 的資料沒用）
            product_sim[product_1[0]][product_2[0]] = similarity(product_1[0], product_2[0])

del score_sqr_dict, dot_value_dict

#### 把 product similarity 寫進檔案

In [19]:
with open("product_similarity.txt", 'w') as outputfile:
    print("# product_1", "product_2", "similarity", sep="\t", file=outputfile)
    for i, product_1 in enumerate(product_sim):
        for product_2, sim in product_sim[product_1].items():
            print(product_1, product_2, sim, sep="\t", file=outputfile)

#### Read product similarity file

In [20]:
product_sim = defaultdict(Counter)

with open("product_similarity.txt") as file:
    next(file)
    for line in file:
        product_sim[ line.split()[0] ][ line.split()[1] ] = float(line.split()[2])
        product_sim[ line.split()[1] ][ line.split()[0] ] = float(line.split()[2])

## Step3: 預測分數

先找出該 user 有評分的 product，再從中挑兩個分數最高的，計算：</br>
r = (sim(p, p_1) * score_dict[user][p_1] + sim(p, p_2) * score_dict[user][p_2])/(sim(p, p_1)+sim(p, p_2))</br>

In [21]:
# 建立以 user 為第一層的 dict，為預測分數的計算方便
score_dict_reverse = defaultdict(Counter)
for product in score_dict:
    for user, score in score_dict[product].items():
        score_dict_reverse[user][product] = score

In [22]:
def predict_score(user, product):
    # candidate: user 評過分的 product set
    candidate = set([product for (product, score) in score_dict_reverse[user].most_common()])
    weighted_score = 0; sim_sum = 0; count = 0
    for (product_2, sim) in product_sim[product].most_common():
        if sim < 0: # 不考慮相似度為負的的狀況（會導致預測分數超過 5）
            break
        if product_2 in candidate:
            weighted_score += sim * score_dict_reverse[user][product_2]
            sim_sum += sim
            count += 1
        if count >= 2:
            break
    return weighted_score/sim_sum if sim_sum > 0 else 0

計算所有（user, product）pair 找出預測分數高的

In [23]:
high_score_pairs = []
for product in products:
    for user in users:
        if product in score_dict and user in score_dict[product]: # 評過分的
            continue
        else:
            score = predict_score(user, product)
            if score >= 4:
                high_score_pairs.append(((user, product), score))

In [24]:
high_score_pairs = sorted(high_score_pairs, key=lambda k: -k[1]) # 按 score 排序
high_score_pairs = sorted(high_score_pairs, key=lambda k: k[0][0]) # 按 user_id 排序

In [25]:
len(high_score_pairs)

17

In [26]:
high_score_pairs

[(('A1B3SY3J47EXZE', 'B002CJARMI'), 5.0),
 (('A1BJHZE41QWBX6', 'B002CJARMI'), 5.0),
 (('A1KPKQZ1GMKAKO', 'B002CJARMI'), 5.0),
 (('A1ODDK3N8XC7UP', 'B002CJARMI'), 5.0),
 (('A1Z54EM24Y40LL', 'B00004RYGX'), 4.0),
 (('A1Z54EM24Y40LL', 'B00004CI84'), 4.0),
 (('A20EEWWSFMZ1PN', 'B007PA30TG'), 5.0),
 (('A20EEWWSFMZ1PN', 'B0047RQ9M0'), 5.0),
 (('A20EEWWSFMZ1PN', 'B003JA5KKS'), 5.0),
 (('A20EEWWSFMZ1PN', 'B007TJGZ54'), 5.0),
 (('A20EEWWSFMZ1PN', 'B001EYUE5M'), 5.0),
 (('A2AAA0U5QD5TGB', 'B002CJARMI'), 5.0),
 (('A2C5VLIJMDPWHI', 'B002CJARMI'), 5.0),
 (('A32207GKRIJYDI', 'B002CJARMI'), 5.0),
 (('A3L5V40F14R2GP', 'B002CJARMI'), 4.0),
 (('ALEABNMSVO1JI', 'B002CJARMI'), 5.0),
 (('AN71KYKFED796', 'B002CJARMI'), 5.0)]

只計算 從 topic tag 以及 frequent Item set 產生出的推薦名單，篩選掉預測分數 < 4 分的 product</br>
(因為直接計算所有沒有評過分的 user-product pair 運算量過大，會跑不完)

In [21]:
with open("user_foods_weichin.json") as file_1, open("user_possibuy_jamie.json") as file_2:
    file1_dict = json.load(file_1)
    file2_dict = json.load(file_2)

KeyboardInterrupt: 

In [41]:
tag_filt_dict = defaultdict(lambda: defaultdict(list))

for i, user in enumerate(file1_dict):
    for tag, products in file1_dict[user].items():
        filt = [ (product, predict_score(user, product)) for product in products if predict_score(user, product) >= 4]
        filt = sorted(filt, key=lambda k: -k[1]) # 按 score 排序
        tag_filt_dict[user][tag] = filt

In [None]:
tag_filt_dict = defaultdict(lambda: defaultdict(list))

for i, user in enumerate(file1_dict):
    for tag, products in file1_dict[user].items():
        filt = [ (product, predict_score(user, product)) for product in products if predict_score(user, product) >= 4]
        filt = sorted(filt, key=lambda k: -k[1]) # 按 score 排序
        tag_filt_dict[user][tag] = filt

In [25]:
import pickle
with open('user_foods.pkl', 'rb') as f:
    info = pickle.load(f)
    print(len(info))

256059


In [28]:
# info['A1Z54EM24Y40LL']

In [48]:
freqItem_filt_dict = defaultdict(list)

for user, products in file2_dict.items():
    filt = [ product for product in products if predict_score(user, product) >= 3.5]
    freqItem_filt_dict[user] = filt

#### 產生中間檔案 -> for MDA_merge.ipynb

In [50]:
with open("freqItem_filt.json", 'w') as output:
    json.dump(freqItem_filt_dict, output)
    
with open("tag_filt.json", 'w') as output:
    json.dump(tag_filt_dict, output)