## Final : Frequently Itemset

In [1]:
from pyspark import SparkConf, SparkContext
import math, sys, glob, random
import numpy as np
from nltk import ngrams, word_tokenize
import json
import matplotlib.pyplot as plt
plt.style.use('ggplot')

conf = SparkConf().setMaster("local").setAppName("Final")
sc = SparkContext(conf=conf)

## load articals

In [2]:
## path setting
root = './data'
banned_path = root+'/Bad_id.txt'
board_roots = [root+'/Gossiping', root+'/HatePolitics', root+'/politics']
data_path = []

for board_root in board_roots:
    data_path += glob.glob(board_root+'/*/*.json')

In [3]:
data_path[:2]

['./data/Gossiping/20190629/M.1561740040.A.7FD.json',
 './data/Gossiping/20190629/M.1561740912.A.873.json']

In [4]:
def load_articals(path):
    articals = []
    with open(path, encoding='utf-8') as f:
        try:
            artical = json.load(f)
            articals.append(list(set([artical['author']]+[push_artical['push_userid'] for push_artical in artical['messages']])))
        except:
            pass
    return articals

In [5]:
paths = sc.parallelize(data_path[:50000])

In [6]:
articals = paths.flatMap(load_articals)

In [7]:
%%time
len(articals.collect())

CPU times: user 419 ms, sys: 45.8 ms, total: 465 ms
Wall time: 28.9 s


50000

## load Banned ID

In [8]:
Banned_ID = {}

In [9]:
def load_bannedID(banned_path=None):
    global Banned_ID, all_user
    
    with open(banned_path) as f:
        all_banID = f.read().split('\n')
        all_banID = all_banID[:len(all_banID)-1]
        for _id in [banID for banID in all_banID]:
            if _id not in Banned_ID:
                Banned_ID[_id] = len(Banned_ID)

In [10]:
#load_banned ID
load_bannedID(banned_path=banned_path)

## build bucket
check banned id is in artical

In [11]:
def check_banned_bucket(ids):
    global Banned_ID
    for _id in ids:
        if(_id in Banned_ID):
            return True
    return False

In [12]:
%%time
#只看那些有跟被banned ID互動文章
banned_articals = articals.filter(lambda x: check_banned_bucket(x))
banned_articals.collect()[0]

CPU times: user 286 ms, sys: 45.9 ms, total: 332 ms
Wall time: 17.7 s


['C13H16ClNO',
 'a2016596 (火花)',
 'ufap',
 'souvlaki',
 'puremanly',
 'darkdogoblin',
 'verakaco',
 'flux',
 'forwardgo',
 'zerolin1226',
 'tony84590',
 'derrick1220',
 'gnehgneh',
 'radiotofu',
 'Julian9x9x9']

所有被banned id的articals中，所有人名字對應到的ID(不管有沒有被banned)

In [13]:
%%time
all_user = banned_articals.flatMap(lambda x: [i for i in x]).distinct().zipWithIndex()

#key : user ID & unique number, val : unique number (name encoding) & user ID
tmp = [(key, val) for (key, val) in all_user.collect()] + [(key, val) for (val, key) in all_user.collect()]
all_users = {key:val for (key, val) in tmp}

CPU times: user 492 ms, sys: 23.6 ms, total: 516 ms
Wall time: 16.1 s


<b>user_appears :</b>
    - key : user ID encoding number
    - value : 在哪幾篇artical出現過

In [14]:
user_appears = banned_articals.zipWithIndex().flatMap(lambda x: [(all_users[name], str(x[1])) for name in x[0]])
user_appears = user_appears.groupByKey().mapValues(list)
print('user_appears key :', user_appears.collect()[0][0])
print('lengths of user_appears val :', len(user_appears.collect()[0][1]))

user_appears key : 0
lengths of user_appears val : 35


## Frequently Itemset

## STEP 1. 算每個user的support & 找Threshold
每個user的support就是user_appears出現於多少個articals。<br>

user ID 依據在出現artical頻率由多至少排序，<br>
threshold暫定是出現頻率最多的1/10

In [15]:
%%time
user_appears = user_appears.sortBy(lambda x: len(x[1]), ascending=False)

values = user_appears.map(lambda x: (x[0], len(x[1]))).values().mean()
print("[means 1]", values)
values = user_appears.map(lambda x: (x[0], len(x[1]))).filter(lambda x: x[1] < values).values().mean()
print("[means 2]", values)
values = user_appears.map(lambda x: (x[0], len(x[1]))).filter(lambda x: x[1] < values).values().mean()
print("[means 3]", values)

threshold = values
print('threshold :', threshold)

[means 1] 6.1786094435218395
[means 2] 1.932310405643739
[means 3] 1.0
threshold : 1.0
CPU times: user 49.7 ms, sys: 4.91 ms, total: 54.6 ms
Wall time: 4.48 s


## Step 2. 過濾掉user support < threshold的user id

In [16]:
%%time
filtered_support = user_appears.filter(lambda x: len(x[1]) >= threshold)
print('Length of filtered support : ', len(filtered_support.collect()))

Length of filtered support :  70461
CPU times: user 404 ms, sys: 18.9 ms, total: 423 ms
Wall time: 1.91 s


過濾後的user中，有多少是沒被banned的：

In [17]:
%%time
normal_users = filtered_support.filter(lambda x: all_users[x[0]] not in Banned_ID)
print('How many normal users after filtered :', len(normal_users.collect()))

How many normal users after filtered : 69745
CPU times: user 2.01 s, sys: 72.6 ms, total: 2.08 s
Wall time: 4.94 s


過濾後的user中，有多少是被banned的：

In [18]:
%%time
banned_users = filtered_support.filter(lambda x: all_users[x[0]] in Banned_ID)
print('How many banned users in all banned articals:', len(banned_users.collect()))

How many banned users in all banned articals: 716
CPU times: user 2.44 s, sys: 33.7 ms, total: 2.47 s
Wall time: 4.78 s


## Step 3. Association Rules
    conf(banned ID -> not banned ID) : 有banned ID的bucket中，也有出現not banned ID的機率
    - 誰常常跟Banned ID在同篇文章中出現

## support for (banned 交集 not banned)
產生被banned ID跟沒被banned ID的pairs，看每個normal ID跟每個banned ID一起出現在同個artical中多少次。

In [19]:
def mapper(x):
    normal, banned = x[0], x[1]
    result = list(set(normal[1]).intersection(set(banned[1])))
    return ((normal[0], banned[0]), len(result))

In [20]:
%%time
#key : (normal user ID, banned user ID), val : 這兩個ID落在同個artical的次數
pairs = normal_users.cartesian(banned_users).map(mapper).filter(lambda x: x[1] != 0)

CPU times: user 1.96 ms, sys: 215 µs, total: 2.18 ms
Wall time: 10.7 ms


In [21]:
print(pairs.collect()[0])

((1705, 1906), 21)


In [22]:
%%time
dict_user_appears = user_appears.collectAsMap()
conf = pairs.map(lambda x: (x[0], x[1] / len(dict_user_appears[x[0][1]]))).sortBy(lambda x: x[1], ascending=False)

CPU times: user 729 ms, sys: 35.4 ms, total: 764 ms
Wall time: 2.85 s


In [23]:
conf.collect()[:3]

[((1705, 5612), 1.0), ((1705, 45514), 1.0), ((1705, 45533), 1.0)]

## interesting Association Rules

In [24]:
NUM_BUCKETS = len(banned_articals.collect())

In [25]:
interest = conf.map(lambda x: (x[0], x[1]-(len(dict_user_appears[x[0][0]]) / NUM_BUCKETS)))
interest = interest.filter(lambda x: x[1] > 0.5).sortBy(lambda x: x[1], ascending=False)

In [26]:
%%time
#(normal id, banned id), how interesting[0, 1]
interest.collect()[:10]

CPU times: user 5.19 s, sys: 137 ms, total: 5.32 s
Wall time: 4min 37s


[((3263, 3274), 0.9997343251859724),
 ((4057, 4069), 0.9997343251859724),
 ((4058, 4069), 0.9997343251859724),
 ((4059, 4069), 0.9997343251859724),
 ((4068, 4069), 0.9997343251859724),
 ((4780, 4804), 0.9997343251859724),
 ((4780, 4856), 0.9997343251859724),
 ((4802, 4804), 0.9997343251859724),
 ((4802, 4856), 0.9997343251859724),
 ((5145, 5135), 0.9997343251859724)]

In [27]:
interest_username = interest.map(lambda x: ((all_users[x[0][0]], all_users[x[0][1]]), x[1]))

## top 10 interest Associateion Rule
被banned ID在的artical，沒有被banned的ID也常出現在那個aritcal上。

In [28]:
%%time
result = interest_username.collect()[:10]

print("banned ID -> normal ID :", "interest associate value")
print("")
for val in result:
    print(val[0][1]+" -> "+val[0][0], ": "+str(val[1]))

banned ID -> normal ID : interest associate value

ruchkunt -> rencher : 0.9997343251859724
undersky825 -> timeflying : 0.9997343251859724
undersky825 -> wxes9050605 : 0.9997343251859724
undersky825 -> chunhung : 0.9997343251859724
undersky825 -> wwwaa1217 (Rwei) : 0.9997343251859724
greatbibi -> hwaien : 0.9997343251859724
zogane -> hwaien : 0.9997343251859724
greatbibi -> a61192909 : 0.9997343251859724
zogane -> a61192909 : 0.9997343251859724
bjzx5 -> newmp4 : 0.9997343251859724
CPU times: user 3.89 s, sys: 88.3 ms, total: 3.98 s
Wall time: 2min 50s


## 可能潛在的網軍

In [29]:
army = interest_username.map(lambda x: (x[0][0], x[1])).groupByKey().sortBy(lambda x: len(x[1]), ascending=False).mapValues(list)

In [30]:
%%time
print(len(army.collect()))

18200
CPU times: user 149 ms, sys: 431 µs, total: 150 ms
Wall time: 2min 43s


In [31]:
army.collect()[:5]

[('Cersei',
  [0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.9375664187035069,
   0.8775664187035068,
   0.86064334178043,
   0.8542330853701735,
   0.8429718241089122,
   0.8125664187035069,
   0.8125664187035069,
   0.8071316360948112,
   0.8071316360948112,
   0.794709275846364,
   0.7875664187035069,
   0.783720264857353,
   0.783720264857353,
   0.7796716818614016,
   0.7796716818614016,
   0.7708997520368402,
   0.7708997520368402,
   0.7708997520368402,
   0.7708997520368402,
   0

## 寫檔

In [32]:
army.map(lambda x: (x[0], len(x[1]))).saveAsTextFile('army_freq_sortByLen')

In [34]:
army.map(lambda x: (x[0], max(x[1]))).sortBy(lambda x: x[1], ascending=False).saveAsTextFile('army_freq_sortByRank')