In [1]:
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkConf, SparkContext
import os
import re
import json

conf = SparkConf()
sc = SparkContext.getOrCreate(conf=conf)

In [2]:
K = 160
post_feature_path = './data_{}_cluster/post_feature_{}.json'.format(K, K)
save_path = './data_{}_cluster/user_feature_{}.json'.format(K, K)

In [3]:
if not os.path.isfile('./user_clicks_train'):
    !gsutil -m cp gs://dataproc-8a63ee66-417e-4e10-b748-8ad7b3f18436-us/user_clicks_combine/user_clicks_train ./user_clicks_train
user_clicks_train = sc.textFile('./user_clicks_train')\
                .map(lambda line: tuple(re.findall(r"'(.*?)'", line)))\
                .map(lambda info: (info[0], (info[1:])))

if not os.path.isfile('./user_clicks_test'):
    !gsutil -m cp gs://dataproc-8a63ee66-417e-4e10-b748-8ad7b3f18436-us/user_clicks_combine/user_clicks_test ./user_clicks_test
user_clicks_test = sc.textFile('./user_clicks_test')\
                .map(lambda line: tuple(re.findall(r"'(.*?)'", line)))\
                .map(lambda info: (info[0], (info[1:])))

In [4]:
print(user_clicks_train.take(1))
print(user_clicks_train.count())

print(user_clicks_test.take(1))
print(user_clicks_test.count())

[('04196427-266b-451e-a1c9-5dd2bd773cee|US', ('1277822', '1283810', '1274411', '1274540', '1289122', '1295648', '1293764', '1213982', '1294362', '1279457', '980205', '1283543', '1290213', '1295639', '865315', '1258494', '1290537', '300456', '1270340', '1263466', '1275188', '1287334', '1281781', '1255927'))]
155673
[('084d094c-e35a-45b1-b90d-7f6b76136b07|CN', ('2199055', '1242003', '2218172', '1042840', '300456', '2215537', '1020180', '2217746', '2216359', '2216160', '1356340'))]
24198


In [5]:
# a user with at least 10 clicks in both train and test
user_clicks_grouped_train = user_clicks_train.reduceByKey(lambda l1, l2: l1 + l2)\
                        .filter(lambda info: len(info[1]) > 10)\
                        .filter(lambda info: info[0].split('|')[0] != '' and info[0].split('|')[1] != '')

user_clicks_grouped_test = user_clicks_test.reduceByKey(lambda l1, l2: l1 + l2)\
                        .filter(lambda info: len(info[1]) > 10)\
                        .filter(lambda info: info[0].split('|')[0] != '' and info[0].split('|')[1] != '')

print(user_clicks_grouped_train.take(1))
print(user_clicks_grouped_train.count())

print(user_clicks_grouped_test.take(1))
print(user_clicks_grouped_test.count())

[('04196427-266b-451e-a1c9-5dd2bd773cee|US', ('1277822', '1283810', '1274411', '1274540', '1289122', '1295648', '1293764', '1213982', '1294362', '1279457', '980205', '1283543', '1290213', '1295639', '865315', '1258494', '1290537', '300456', '1270340', '1263466', '1275188', '1287334', '1281781', '1255927'))]
95247
[('e0a52dd1-2a8d-414d-b1e7-d53eae1fd9af|ES', ('981765', '14214', '107143', '135623', '66229', '82205', '978047', '137106', '117993', '36178', '134663', '80305', '981758', '109564', '981731', '109361', '106368', '96336', '980134', '109696', '976949', '39870', '981958', '108446', '982574', '978045', '136964', '976380', '104994'))]
22297


In [6]:
# filter out users in train unpresent in test
users_test = user_clicks_grouped_test.map(lambda info: (info[0], len(info) - 1)).collectAsMap()
print(len(users_test))

user_clicks_grouped_train_present_in_test = user_clicks_grouped_train.filter(lambda info: users_test.get(info[0]) is not None)

users_train = user_clicks_grouped_train_present_in_test.map(lambda info: (info[0], len(info) - 1)).collectAsMap()
user_clicks_grouped_test_present_in_train = user_clicks_grouped_test.filter(lambda info: users_train.get(info[0]) is not None)


print(user_clicks_grouped_train_present_in_test.count())
print(user_clicks_grouped_train_present_in_test.take(1))

print(user_clicks_grouped_test_present_in_train.count())
print(user_clicks_grouped_test_present_in_train.take(1))



22297
8425
[('122b7665-6fac-4278-97e8-b72f7ccbb5dc|CA', ('1204234', '1279326', '1303559', '1303095', '1294468', '1291345', '1294630', '1300744', '1306010', '1297366', '1306464', '1300534', '1292887', '1304703', '1252819', '1304914', '1302796', '835561', '1306111', '1303479', '1294018', '1306502', '571314', '1230406', '1297326', '1266919', '1319968', '1321695', '822793', '1320028', '1324974', '1316488', '1311850', '1316835', '1231743', '1320001', '1316364', '1252819', '1315364', '1316381', '1310291', '1313806', '1314249', '1311329', '1311003', '1325428', '1316363', '716627', '1253223', '1319976', '1328614', '1248217', '1350109', '1346601', '559815', '1213333', '59859', '1350476', '1327102', '722391', '1327941', '1307546', '1334790', '1306535', '1327545', '1327641', '1326167', '1349870', '1234020', '1341232', '1350470', '1342615', '1348048', '1347829', '739076', '1344368', '1343083', '715619', '1344061', '1341736', '1341574', '1339778', '1047306', '1351678', '1339322', '1183115', '108444

In [7]:
# extract user country feature

# filter countries with less than 1000 users
user_country = user_clicks_grouped_train_present_in_test.map(lambda info: tuple(info[0].split('|')) + info[1])\
                .map(lambda info: (info[1], 1))\
                .reduceByKey(lambda x1, x2: x1 + x2)\
                .filter(lambda info: info[1] > 50)

countries = user_country.keys().distinct()
print(user_country.collect())
print(user_country.count())

# generate country dictionary
country_dict = {}
key = 0
for country in countries.collect():
    country_dict[country] = key
    key += 1
print(country_dict)

[('CA', 550), ('NL', 52), ('ES', 162), ('NZ', 57), ('DE', 152), ('ZA', 67), ('MX', 129), ('IE', 57), ('GB', 673), ('US', 4729), ('IN', 133), ('AU', 206), ('SG', 90), ('PH', 66)]
14
{'CA': 0, 'NL': 1, 'ES': 2, 'NZ': 3, 'DE': 4, 'ZA': 5, 'MX': 6, 'IE': 7, 'GB': 8, 'US': 9, 'IN': 10, 'AU': 11, 'SG': 12, 'PH': 13}


In [8]:
# generate country vector for each user
def create_country_feature(user, number):
    id = user[0]
    country = user[1]
    clicks = user[2:]
    vec = [0.0] * number
    vec[int(country_dict[country])] = 1.0
    return (id, vec, clicks)

number = key
user_country_clicks = user_clicks_grouped_train_present_in_test.map(lambda info: tuple(info[0].split('|')) + info[1])\
                        .filter(lambda info: info[1] in country_dict)\
                        .map(lambda user: create_country_feature(user, number))
print(user_country_clicks.take(2))
print(user_country_clicks.count())

[('122b7665-6fac-4278-97e8-b72f7ccbb5dc', [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], ('1204234', '1279326', '1303559', '1303095', '1294468', '1291345', '1294630', '1300744', '1306010', '1297366', '1306464', '1300534', '1292887', '1304703', '1252819', '1304914', '1302796', '835561', '1306111', '1303479', '1294018', '1306502', '571314', '1230406', '1297326', '1266919', '1319968', '1321695', '822793', '1320028', '1324974', '1316488', '1311850', '1316835', '1231743', '1320001', '1316364', '1252819', '1315364', '1316381', '1310291', '1313806', '1314249', '1311329', '1311003', '1325428', '1316363', '716627', '1253223', '1319976', '1328614', '1248217', '1350109', '1346601', '559815', '1213333', '59859', '1350476', '1327102', '722391', '1327941', '1307546', '1334790', '1306535', '1327545', '1327641', '1326167', '1349870', '1234020', '1341232', '1350470', '1342615', '1348048', '1347829', '739076', '1344368', '1343083', '715619', '1344061', '1341736', '1341574', '133

In [9]:
# extract post feature: (id, vector)
with open(post_feature_path, 'r') as inputfile:  
    post_feature_dict = json.load(inputfile)

print(post_feature_dict.get('1002075'))

[0.281270855, 0.109975921, 0.124338905, -0.221134741, 0.107404406, -0.061329004, 0.336072625, -0.103838169, -0.080011655, 0.095860341, 0.102366746, -0.173454508, 0.221690306, 0.124662899, -0.004446651, -0.202376869, 0.024064676, 0.230010116, 0.094039662, 0.237077338, -0.111818731, 0.281354555, 0.077846064, 0.062100764, -0.235334693, 0.069466248, 0.142621747, -0.017744301, 0.162133874, 0.455479974, -0.049766763, 0.079899357, 7.0, 1554249600.0]


In [10]:
post_feature = post_feature_dict.get('1002075')
print(int(post_feature[-2]))

7


In [14]:
# generate user clicks feature vector from clustering result
def extract_clicks_distribution(info):
    id = info[0]
    country = info[1]
    clicks = [0] * K
    count = 0
    for click in info[2]:
        post_feature = post_feature_dict.get(click)
        if post_feature is not None:
            print(post_feature[-2])
            c = int(post_feature[-2])
            clicks[c] += 1
            count += 1
    if count == 0:
        return (id, None)
    for i in range(K):
        clicks[i] = clicks[i] / count
    return (id, country + clicks)
    
    
user_feature = user_country_clicks.map(lambda info: extract_clicks_distribution(info)).filter(lambda info: info[1] is not None)
print(user_feature.take(1))

[('122b7665-6fac-4278-97e8-b72f7ccbb5dc', [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.023809523809523808, 0.0, 0.0, 0.0, 0.0, 0.047619047619047616, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.047619047619047616, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.023809523809523808, 0.0, 0.023809523809523808, 0.0, 0.0, 0.0, 0.0, 0.047619047619047616, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.023809523809523808, 0.023809523809523808, 0.0, 0.0, 0.11904761904761904, 0.0, 0.0, 0.09523809523809523, 0.07142857142857142, 0.0, 0.0, 0.0, 0.0, 0.023809523809523808, 0.0, 0.0, 0.023809523809523808, 0.023809523809523808, 0.0, 0.0, 0.0, 0.19047619047619047, 0.0, 0.0, 0.19047619047619047, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 

In [15]:
#print(user_feature.count())

with open(save_path, 'w') as outfile:  
    json.dump(user_feature.collectAsMap(), outfile)

TypeError: cannot convert dictionary update sequence element #22 to a sequence

In [13]:
# save user clicklist for training and testing
user_clicklist_train_output = user_clicks_grouped_train_present_in_test.map(lambda info: tuple(info[0].split('|')) + info[1])\
                        .filter(lambda info: info[1] in country_dict).map(lambda info: (info[0], list(info[2:])))

print(user_clicklist_train_output.take(2))
print(user_clicklist_train_output.count())

user_clicklist_test_output = user_clicks_grouped_test_present_in_train.map(lambda info: tuple(info[0].split('|')) + info[1])\
                        .filter(lambda info: info[1] in country_dict).map(lambda info: (info[0], list(info[2:])))

print(user_clicklist_test_output.take(2))
print(user_clicklist_test_output.count())


[('122b7665-6fac-4278-97e8-b72f7ccbb5dc', ['1204234', '1279326', '1303559', '1303095', '1294468', '1291345', '1294630', '1300744', '1306010', '1297366', '1306464', '1300534', '1292887', '1304703', '1252819', '1304914', '1302796', '835561', '1306111', '1303479', '1294018', '1306502', '571314', '1230406', '1297326', '1266919', '1319968', '1321695', '822793', '1320028', '1324974', '1316488', '1311850', '1316835', '1231743', '1320001', '1316364', '1252819', '1315364', '1316381', '1310291', '1313806', '1314249', '1311329', '1311003', '1325428', '1316363', '716627', '1253223', '1319976', '1328614', '1248217', '1350109', '1346601', '559815', '1213333', '59859', '1350476', '1327102', '722391', '1327941', '1307546', '1334790', '1306535', '1327545', '1327641', '1326167', '1349870', '1234020', '1341232', '1350470', '1342615', '1348048', '1347829', '739076', '1344368', '1343083', '715619', '1344061', '1341736', '1341574', '1339778', '1047306', '1351678', '1339322', '1183115', '1084445', '1355593',

In [14]:
with open('./data_{}_cluster/user_clicklist_train.json'.format(K), 'w') as outfile:  
    json.dump(user_clicklist_train_output.collectAsMap(), outfile)
    
with open('./data_{}_cluster/user_clicklist_test.json'.format(K), 'w') as outfile:  
    json.dump(user_clicklist_test_output.collectAsMap(), outfile)