## 2020 Massive Data Analysis Term Project
## SimRank
Group32 - 106070038 杜葳葳

SimRank 用於處理由用戶與物品所構成的 Bipartite(二部圖),<br>
如果兩個用戶相似,則與這兩個互相關聯的物品也類似,<br>
如果兩個物品相似,則與這兩個物品相關的用戶也類似<br>

### 匯入 SparkConf、SparkContext、Pyspark模組 
設定環境變數（解決版本問題）

In [1]:
# import package
from pyspark import SparkConf,SparkContext
import pyspark

# set environment variables
import os
os.environ["PYSPARK_PYTHON"]="/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9"
os.environ['PYSPARK_DRIVER_PYTHON'] = '/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9'

In [2]:
sc.stop()

建立 SparkConf 物件 
設為 local 模式、名稱設為 "final"

In [3]:
conf = SparkConf().setMaster("local").setAppName("final")
sc = SparkContext(conf=conf)
sc

### 讀檔案

In [535]:
lines = sc.textFile("sample1.txt")
#lines.first
lines.take(1)

['pc,hp.com']

In [536]:
Split_RDD = lines.map(lambda x : x.split(",")).map(lambda x: (str(x[0]),str(x[1])))
total_len = Split_RDD.count()

> total_len: 118

### 前處理

quries_sum: 各個quries分別的ads數加總 <br>
q_list: 所有quries的名稱（不重複）<br>
ads_sum: 各個ads分別的quries數加總 <br>
a_list: 所有ads的名稱（不重複）<br>

In [537]:
queries_sum = Split_RDD.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)
q_list = queries_sum.map(lambda x: (x[0]))
ads_sum = Split_RDD.map(lambda x: (x[1],1)).reduceByKey(lambda x,y: x+y)
ads_list = ads_sum.map(lambda x: (x[0]))

> queries_sum: [('pc', 10), ('camera', 25), ('digital camera', 37), ('tv', 15), ('flower', 31)]<br>
> ads_sum: [('hp.com', 60), ('bestbuy.com', 27), ('teleflora.com', 16), ('orchids.com', 15)]<br>
> q_list: ['pc', 'camera', 'digital camera', 'tv', 'flower'] <br>
> ads_list: ['hp.com', 'bestbuy.com', 'teleflora.com', 'orchids.com']

### 建立graph
計算每個query-ad pair出現的次數<br>
**query_ad_pair**: (query, ad), times

In [538]:
query_ad_pair = Split_RDD.map(lambda x: ((x[0],x[1]),1)).reduceByKey(lambda x,y: x+y)
query_ad_pair.collect()

[(('pc', 'hp.com'), 10),
 (('camera', 'hp.com'), 20),
 (('camera', 'bestbuy.com'), 5),
 (('digital camera', 'hp.com'), 30),
 (('digital camera', 'bestbuy.com'), 7),
 (('tv', 'bestbuy.com'), 15),
 (('flower', 'teleflora.com'), 16),
 (('flower', 'orchids.com'), 15)]

map 出一個 #queries x #ads 的矩陣<br>
key 是 (query, ad), value 是 0<br>
再把此矩陣和 query_ad_pair union<br>
得到一個新矩陣 graph<br>
**graph**: (query, ad), times

In [539]:
N = q_list.cartesian(ads_list).map(lambda x: (x, 0))
graph = N.union(query_ad_pair)

In [540]:
graph.collect()

[(('pc', 'hp.com'), 0),
 (('pc', 'bestbuy.com'), 0),
 (('pc', 'teleflora.com'), 0),
 (('pc', 'orchids.com'), 0),
 (('camera', 'hp.com'), 0),
 (('digital camera', 'hp.com'), 0),
 (('camera', 'bestbuy.com'), 0),
 (('camera', 'teleflora.com'), 0),
 (('digital camera', 'bestbuy.com'), 0),
 (('digital camera', 'teleflora.com'), 0),
 (('camera', 'orchids.com'), 0),
 (('digital camera', 'orchids.com'), 0),
 (('tv', 'hp.com'), 0),
 (('flower', 'hp.com'), 0),
 (('tv', 'bestbuy.com'), 0),
 (('tv', 'teleflora.com'), 0),
 (('flower', 'bestbuy.com'), 0),
 (('flower', 'teleflora.com'), 0),
 (('tv', 'orchids.com'), 0),
 (('flower', 'orchids.com'), 0),
 (('pc', 'hp.com'), 10),
 (('camera', 'hp.com'), 20),
 (('camera', 'bestbuy.com'), 5),
 (('digital camera', 'hp.com'), 30),
 (('digital camera', 'bestbuy.com'), 7),
 (('tv', 'bestbuy.com'), 15),
 (('flower', 'teleflora.com'), 16),
 (('flower', 'orchids.com'), 15)]

### Function
identity() <br>
用來map出矩陣對角線上為1, 其他皆是0的rdd

In [541]:
def identity(x):
    if x[0][0] == x[0][1] : return ((x[0][0],x[0][1]),1)
    return ((x[0][0],x[0][1]),0)

In [542]:
query_sim = q_list.cartesian(q_list).map(lambda x: (x, 0)).map(identity)
ad_sim = ads_list.cartesian(ads_list).map(lambda x: (x, 0)).map(identity)
ad_sim.collect()

[(('hp.com', 'hp.com'), 1),
 (('hp.com', 'bestbuy.com'), 0),
 (('hp.com', 'teleflora.com'), 0),
 (('hp.com', 'orchids.com'), 0),
 (('bestbuy.com', 'hp.com'), 0),
 (('teleflora.com', 'hp.com'), 0),
 (('bestbuy.com', 'bestbuy.com'), 1),
 (('bestbuy.com', 'teleflora.com'), 0),
 (('teleflora.com', 'bestbuy.com'), 0),
 (('teleflora.com', 'teleflora.com'), 1),
 (('bestbuy.com', 'orchids.com'), 0),
 (('teleflora.com', 'orchids.com'), 0),
 (('orchids.com', 'hp.com'), 0),
 (('orchids.com', 'bestbuy.com'), 0),
 (('orchids.com', 'teleflora.com'), 0),
 (('orchids.com', 'orchids.com'), 1)]

In [543]:
from operator import add
graph_q = graph.map(lambda x: (x[0][0], x[1]))
q_sum = graph_q.reduceByKey(add).map(lambda x: (x[0],x[1]))
graph_ads = graph.map(lambda x: (x[0][1], x[1]))
ads_sum = graph_ads.reduceByKey(add).map(lambda x: (x[0],x[1]))

> q_sum: <br>
> [('pc', 10), <br>
>  ('flower', 31), <br>
>  ('camera', 25), <br>
>  ('digital camera', 37), <br>
>  ('tv', 15)]<br><br>
> ads_sum: <br>
> [('hp.com', 60),<br>
> ('teleflora.com', 16),<br>
> ('bestbuy.com', 27),<br>
> ('orchids.com', 15)]

### 參數設定
C: 常數 <br>

In [556]:
C = 0.8

### SimRank 算法
- if a=b => s(a,b)=1
- else if 指向結點a的結點集合or指向結點b的結點集合是空集合 => s(a,b)=0
- else 其他情況 => s(a,b)=$[\frac{C}{|I(a)||I(b)|}]$ $\Sigma$$\Sigma$s(I(a),I(b))


### SimRank 實作

分成兩部分<br>
queries simrank 和 ads simrank<br>
#### 建立query_sim、ad_sim
分別初始化 query 和 ad 的 simrank 矩陣 <br>
對角線上為1, 其他為0 
#### Queries SimRank
把在對角線上(q1=q2)和不在對角線上(q1!=q2)用filter分開處理<br>
- 在對角線上(q1=q2): 把value設為1
- 不在對角線上(q1!=q2): 分為Prefix和Postfix兩個部分
    - Prefix: $[\frac{C}{|I(a)||I(b)|}]$
        - 把a和b分別當成key,找到q_sum中的value
        - 相乘取倒數,再乘上常數C
    - Postfix: $\Sigma$$\Sigma$s(I(a),I(b))
        - 用上面prefix的key當成新的key,map到graph的value,同時把value＝0的刪除
        - 把key變成ads,並把重複的刪除
        - 用cartesian重新map出新的rdd,並到ad_sim取值,再把所有value加總
    - 完成之後把Prefix和Postfix相乘,再和(q1=q2)的union在一起 
    
#### Ads SimRank
按照queries simrank的步驟, 但將queries改為ads, ad_sim改為q_sim

In [557]:
query_sim = q_list.cartesian(q_list).map(lambda x: (x, 0)).map(identity)
ad_sim = ads_list.cartesian(ads_list).map(lambda x: (x, 0)).map(identity)
    
# queries simrank
new_query_sim = q_list.cartesian(q_list).map(lambda x: (x, 0)).map(identity)

## q1 == q2
same_new_query_sim = new_query_sim.filter(lambda r: r[0][0]==r[0][1]).map(lambda x: ((x[0][0],x[0][1]),x[1]))

## q1 != q2 
### Prefix
#### q1, ((q2,new_query_sim_val),q1_q_sum)
not_same_new_query_sim =  new_query_sim.filter(lambda r: r[0][0]!=r[0][1]).map(lambda x: (x[0][0],(x[0][1],x[1]))).join(q_sum)
#### q2, (q1,new_query_sim_val,q1_q_sum,q2_q_sum)
not_same_new_query_sim =  not_same_new_query_sim.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1]))).join(q_sum)
#### (q1,q2),C/(q1_q_sum*q2_q_sum)
prefix_q = not_same_new_query_sim.map(lambda x: ((x[1][0][0],x[0]),C/(x[1][0][2]*x[1][1])))


### Postfix
q1q2 = prefix_q.map(lambda x: ((x[0][0]),x[0][1]))
q2q1 = prefix_q.map(lambda x: ((x[0][1]),x[0][0]))
#### q,(ad,val)
graph_q = graph.map(lambda x: ((x[0][0]),(x[0][1],x[1])))
#### join => q1,(q2,(ad,val)) if val!=0
#### q1,(q2,ad,val)->ad, (q1,q2,val)
get_ads_q1 = q1q2.join(graph_q).filter(lambda r: r[1][1][1]!=0).map(lambda x:((x[1][1][0]),(x[0],x[1][0],x[1][1][1])))
get_ads_q2 = q2q1.join(graph_q).filter(lambda r: r[1][1][1]!=0).map(lambda x:((x[1][1][0]),(x[0],x[1][0],x[1][1][1])))

### get all ads
merge_ad_q1_list = get_ads_q1.keys().collect()
merge_ad_q2_list = get_ads_q2.keys().collect()

### delete duplicate and change to rdd
merge_ad_q1_set = set(merge_ad_q1_list)
merge_ad_q1_list = list(merge_ad_q1_set)
merge_ad_q2_set = set(merge_ad_q2_list)
merge_ad_q2_list = list(merge_ad_q2_set)
merge_ad_q1_rdd = sc.parallelize(merge_ad_q1_list)
merge_ad_q2_rdd = sc.parallelize(merge_ad_q2_list)

postfix_index = merge_ad_q1_rdd.cartesian(merge_ad_q2_rdd).map(lambda x: (x, 0))
### (ad1,ad2),(0,ad_sim) -> (ad1,ad2),(ad_sim) 
postfix_q = postfix_index.join(ad_sim).map(lambda x:((x[0][0],x[0][1]),x[1][1])).values().sum()

pp = prefix_q.map(lambda x : ((x[0][0],x[0][1]),x[1]*postfix_q))
## merge same and not same
pp = pp.union(same_new_query_sim)


# ads simrank
new_ad_sim = ads_list.cartesian(ads_list).map(lambda x: (x, 0)).map(identity)

## ads1 == ads2
same_new_ad_sim = new_ad_sim.filter(lambda r: r[0][0]==r[0][1]).map(lambda x: ((x[0][0],x[0][1]),x[1]))

## ads1 != ads2 prefix
### ads1, ((ads2,new_ad_sim_val),ads1_ads_sum)
not_same_new_ad_sim =  new_ad_sim.filter(lambda r: r[0][0]!=r[0][1]).map(lambda x: (x[0][0],(x[0][1],x[1]))).join(ads_sum)
### ads2, (ads1,new_ad_sim_val,ads1_ads_sum,ads2_ads_sum)
not_same_new_ad_sim =  not_same_new_ad_sim.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1]))).join(ads_sum)

### (ads1,ads2),C/(ads1_ads_sum*ads2_ads_sum)
prefix_a = not_same_new_ad_sim.map(lambda x: ((x[1][0][0],x[0]),C/(x[1][0][2]*x[1][1])))

## ads1 != ads2 postfix
### ads1,ads2
a1a2 = prefix_a.map(lambda x: ((x[0][0]),x[0][1]))
a2a1 = prefix_a.map(lambda x: ((x[0][1]),x[0][0]))
### ad,(q,val)
graph_a = graph.map(lambda x: ((x[0][1]),(x[0][0],x[1])))
### join => ads1,(ads2,(q,val))
### q1,(q2,ad,val)->ad, (q1,q2,val)
get_q_a1 = a1a2.join(graph_a).filter(lambda r: r[1][1][1]!=0).map(lambda x:((x[1][1][0]),(x[0],x[1][0],x[1][1][1])))
get_q_a2 = a2a1.join(graph_a).filter(lambda r: r[1][1][1]!=0).map(lambda x:((x[1][1][0]),(x[0],x[1][0],x[1][1][1])))

### get all quries
merge_q_ad1_list = get_q_a1.keys().collect()
merge_q_ad2_list = get_q_a2.keys().collect()

### delete duplicate and change to rdd
merge_q_ad1_set = set(merge_q_ad1_list)
merge_q_ad1_list = list(merge_q_ad1_set)
merge_q_ad2_set = set(merge_q_ad2_list)
merge_q_ad2_list = list(merge_q_ad2_set)
merge_q_ad1_rdd = sc.parallelize(merge_q_ad1_list)
merge_q_ad2_rdd = sc.parallelize(merge_q_ad2_list)

postfix_index = merge_q_ad1_rdd.cartesian(merge_q_ad2_rdd).map(lambda x: (x, 0))

### (ad1,ad2),(0,ad_sim) -> (ad1,ad2),(ad_sim)
postfix_a = postfix_index.join(query_sim).map(lambda x:((x[0][0],x[0][1]),x[1][1])).values().sum()

aa = prefix_a.map(lambda x : ((x[0][0],x[0][1]),x[1]*postfix_a))
aa = aa.union(same_new_ad_sim)

query_sim = pp.map(lambda x : ((x[0][0],x[0][1]),x[1]))
ad_sim = aa.map(lambda x : ((x[0][0],x[0][1]),x[1]))

### 修改輸出格式

In [558]:
q = q_list.collect()
a = ads_list.collect()
output_query_sim = query_sim.map(lambda x: (x[0][0],x[0][1],x[1])).collect()
output_ad_sim = ad_sim.map(lambda x: (x[0][0],x[0][1],x[1])).collect()
output_query = []
output_ad = []
for i in range(len(q)):
    for j in range(len(q)):
        for k in range(len(output_query_sim)):
            if q[i]==output_query_sim[k][0] and q[j]==output_query_sim[k][1]:
                output_query.append(output_query_sim[k][2])
for i in range(len(a)):
    for j in range(len(a)):        
        for k in range(len(output_ad_sim)):
            if a[i]==output_ad_sim[k][0] and a[j]==output_ad_sim[k][1]:            
                output_ad.append(output_ad_sim[k][2])

### Output: Query SimRank
各個Query之間的相似度

In [559]:
## output query sim
print(q)
output_row = []
for i in range(len(output_query)):
    output_row.append(round(output_query[i],3))
    if (i+1)%len(q)==0:
        print(output_row)
        output_row = []

['pc', 'camera', 'digital camera', 'tv', 'flower']
[1, 0.013, 0.009, 0.021, 0.01]
[0.013, 1, 0.003, 0.009, 0.004]
[0.009, 0.003, 1, 0.006, 0.003]
[0.021, 0.009, 0.006, 1, 0.007]
[0.01, 0.004, 0.003, 0.007, 1]


> 'pc', 'camera', 'digital camera', 'tv', 'flower'<br>
> [1, 0.013, 0.009, 0.022, 0.011]<br>
> [0.013, 1, 0.004, 0.009, 0.004]<br>
> [0.009, 0.004, 1, 0.006, 0.003]<br>
> [0.022, 0.009, 0.006, 1, 0.007]<br>
> [0.011, 0.004, 0.003, 0.007, 1]

### Output: Ads SimRank
各個Ads之間的相似度

In [560]:
## output ad sim
print(a)
output_row = []
for i in range(len(output_ad)):
    output_row.append(round(output_ad[i],3))
    if (i+1)%len(a)==0:
        print(output_row)
        output_row = []

['hp.com', 'bestbuy.com', 'teleflora.com', 'orchids.com']
[1, 0.002, 0.004, 0.004]
[0.002, 1, 0.009, 0.01]
[0.004, 0.009, 1, 0.017]
[0.004, 0.01, 0.017, 1]


> 'hp.com', 'bestbuy.com', 'teleflora.com', 'orchids.com'<br>
> [1, 0.003, 0.004, 0.005]<br>
> [0.003, 1, 0.01, 0.01]<br>
> [0.004, 0.01, 1, 0.017]<br>
> [0.005, 0.01, 0.017, 1]