In [1]:
import os
import numpy as np
import timeit
import pyspark
import pandas as pd
import s3fs
import time
import ast
import operator
from datetime import datetime
import statistics

In [2]:
sc = pyspark.SparkContext.getOrCreate()
ss = pyspark.sql.SparkSession.builder.getOrCreate()

## Set up the data address book

In [3]:
s3_address = "s3a://msds630-kaggle-competition/"
dataset_addr_book = {}
dataset_name_list = ["events", "messages", "attributes", "sessions"]
for name in dataset_name_list:
    dataset_addr_book[name] = "".join([s3_address, name, ".csv"])

dataset_addr_book

{'attributes': 's3a://msds630-kaggle-competition/attributes.csv',
 'events': 's3a://msds630-kaggle-competition/events.csv',
 'messages': 's3a://msds630-kaggle-competition/messages.csv',
 'sessions': 's3a://msds630-kaggle-competition/sessions.csv'}

## Load events

In [4]:
events_rdd = sc.textFile(dataset_addr_book["events"])\
               .map(lambda line : line.encode('ascii', 'ignore'))
cols_events = events_rdd.map(lambda x: x.split(',')).take(1)[0]
print(cols_events)
print(len(cols_events))

['app_id', 'session_id', 'event', 'event_timestamp', 'event_value', 'user_id_hash']
6


In [5]:
events_data = events_rdd.filter(lambda x: 'app_id' not in x)\
                        .map(lambda x: x.split(','))
events_data.take(1)

[['4724682771660800',
  '5558845121177764917',
  '45',
  '1542215397132',
  '0.0',
  '9943447915df3a45fd6720a026af905b6da6b56a37701b8b2629802e9a541006']]

In [6]:
cols_events_adj = cols_events[1:] # Drop app_id
cols_events_adj

['session_id', 'event', 'event_timestamp', 'event_value', 'user_id_hash']

In [7]:
events_data_adj = events_data.map(lambda x: x[1:]) # Drop app_id
events_data_adj.take(1)

[['5558845121177764917',
  '45',
  '1542215397132',
  '0.0',
  '9943447915df3a45fd6720a026af905b6da6b56a37701b8b2629802e9a541006']]

In [8]:
def convertTime(ts_str):
    return datetime.utcfromtimestamp(int(ts_str)/1000.0).strftime('%Y-%m-%d %H:%M:%S')
convertTime('1541638424150')

'2018-11-08 00:53:44'

## Train

In [12]:
events_data_train = events_data_adj.filter(lambda x: convertTime(x[2])[:10] < '2018-12-02')\
                                   .filter(lambda x: convertTime(x[2])[:10] >= '2018-11-25')
events_data_train.take(1)

[['3911933845664332608',
  '45',
  '1543451911352',
  '0.0',
  '9943447915df3a45fd6720a026af905b6da6b56a37701b8b2629802e9a541006']]

In [13]:
temp_pair_train_dict = {}
for i in range(6):
    if i != 0:
        temp_pair = events_data_train.map(lambda x: (x[0], x[i]))\
                                    .groupByKey().map(lambda x: [x[0], list(x[1])])
        temp_pair_train_dict[i] = temp_pair

In [14]:
events_data_train_adj = temp_pair_train_dict[1]
events_data_train_adj = events_data_train_adj.leftOuterJoin(temp_pair_train_dict[2])\
                                             .map(lambda x: (x[0], list(x[1])))
events_data_train_adj = events_data_train_adj.leftOuterJoin(temp_pair_train_dict[3])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][1]]))
events_data_train_adj = events_data_train_adj.leftOuterJoin(temp_pair_train_dict[4])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][0][2], x[1][1]]))
events_data_train_adj.take(1)

[('6309264250257609843',
  [['1',
    '5',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '45',
    '4',
    '6',
    '40',
    '41',
    '3',
    '42'],
   ['1543175110236',
    '1543175113178',
    '1543175188473',
    '1543175236995',
    '1543175242927',
    '1543175281611',
    '1543175322511',
    '1543175370581',
    '1543175381381',
    '1543175420159',
    '1543175444136',
    '1543175469816',
    '1543175478723',
    '1543175484426',
    '1543175488738',
    '1543175548012',
    '1543175624174',
    '1543175687406',
    '1543175696612',
    '1543175706829',
    '1543175710120',
    '1543175773504',
    '1543175775452'],
   ['0.0',
    '1.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0',
    '0.0'],
   ['10596d

In [15]:
def most_common(lst):
    return max(set(lst), key=lst.count)

In [16]:
events_data_train_adj2 = events_data_train_adj.map(lambda x: [x[0], x[1][0], x[1][1], x[1][2], 
                                                              most_common(x[1][3])])
events_data_train_adj2.take(1)

[['6309264250257609843',
  ['1',
   '5',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '45',
   '4',
   '6',
   '40',
   '41',
   '3',
   '42'],
  ['1543175110236',
   '1543175113178',
   '1543175188473',
   '1543175236995',
   '1543175242927',
   '1543175281611',
   '1543175322511',
   '1543175370581',
   '1543175381381',
   '1543175420159',
   '1543175444136',
   '1543175469816',
   '1543175478723',
   '1543175484426',
   '1543175488738',
   '1543175548012',
   '1543175624174',
   '1543175687406',
   '1543175696612',
   '1543175706829',
   '1543175710120',
   '1543175773504',
   '1543175775452'],
  ['0.0',
   '1.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0'],
  '10596df08f16abc27259f399c1e71d5b591a931c22eeed1cd3fa8ba3e798d237']]

In [17]:
def findPur(event_list):
    pur_list = []
    for i in range(len(event_list)):
        if event_list[i] == '8':
            pur_list.append(i)
    return pur_list     

In [18]:
events_data_train_adj3 = events_data_train_adj2.map(lambda x: [x[0], findPur(x[1]), len(x[1]), x[2], x[3], x[4]])
events_data_train_adj3.filter(lambda x: x[1]!=[]).take(1)

[['6706918736980330942',
  [0],
  33,
  ['1543276071431',
   '1543276092053',
   '1543276092059',
   '1543276223128',
   '1543276393299',
   '1543276445457',
   '1543276651567',
   '1543276726439',
   '1543276826273',
   '1543276922619',
   '1543276922631',
   '1543277005322',
   '1543277020292',
   '1543277077316',
   '1543277144537',
   '1543277278941',
   '1543277340345',
   '1543277348546',
   '1543277399346',
   '1543277406008',
   '1543277416756',
   '1543277435860',
   '1543277609704',
   '1543277611067',
   '1543277635349',
   '1543277759711',
   '1543277805399',
   '1543277904395',
   '1543277945300',
   '1543278037594',
   '1543278095400',
   '1543278139337',
   '1543278377618'],
  ['3.493',
   '25.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '19.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '1.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0',
   '0.0

In [19]:
def moneySum(value_list ,idx_list):
    if idx_list == []:
        return 0.0
    moneySum = 0.0
    for idx in idx_list:
        moneySum += float(value_list[idx])
    return moneySum
        
a = ['1.0','1.0','5.0']
b = [0,1]
moneySum(a,b)

2.0

In [20]:
events_data_train_adj4 = events_data_train_adj3.map(lambda x: [x[5], x[0], x[2], len(x[1]),
                                                               float(max(x[3]))-float(min(x[3])), 
                                                               moneySum(x[4], x[1])])
events_data_train_adj4.filter(lambda x: x[3]>0).take(1)

[['175800a4d243a8210d681cb9b3ecf66288e71080d06dfe7b4caeb221119f0a66',
  '6706918736980330942',
  33,
  1,
  2306187.0,
  3.493]]

In [21]:
temp_pair_train_dict_2 = {}
for i in range(6):
    if i != 0:
        temp_pair = events_data_train_adj4.map(lambda x: (x[0], x[i]))\
                                    .groupByKey().map(lambda x: [x[0], list(x[1])])
        temp_pair_train_dict_2[i] = temp_pair

In [22]:
events_data_train_adj5 = temp_pair_train_dict_2[1]
events_data_train_adj5 = events_data_train_adj5.leftOuterJoin(temp_pair_train_dict_2[2])\
                                             .map(lambda x: (x[0], list(x[1])))
events_data_train_adj5 = events_data_train_adj5.leftOuterJoin(temp_pair_train_dict_2[3])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][1]]))
events_data_train_adj5 = events_data_train_adj5.leftOuterJoin(temp_pair_train_dict_2[4])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][0][2], x[1][1]]))
events_data_train_adj5 = events_data_train_adj5.leftOuterJoin(temp_pair_train_dict_2[5])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][0][2], 
                                                                    x[1][0][3], x[1][1]]))
events_data_train_adj5.take(1)

[('c7385007b6f158e913411d1563d1f5ef90ea003050b105583104a583768edc85',
  [['5373970154285758020'], [1], [0], [0.0], [0.0]])]

In [23]:
def getMedian(lst):
    if lst == []:
        return 0.0
    elif len(lst) % 2 == 1:
        return sorted(lst)[(len(lst)-1)/2]
    else:
        lst = sorted(lst)
        mid = (len(lst)-1)/2
        return (lst[mid]+lst[mid+1])/2.0
    
print(getMedian([0,1,2,]))
print(getMedian([0,1,2,3]))

1
1.5


In [24]:
events_data_train_adj6 = events_data_train_adj5.map(lambda x: [x[0], 
                                                               len(x[1][0]),
                                                               sum(x[1][1]),
                                                               getMedian(x[1][1]),
                                                               sum(x[1][2]), 
                                                               sum(x[1][4]),
                                                               getMedian(x[1][3])])
events_data_train_adj6.take(1)

[['c7385007b6f158e913411d1563d1f5ef90ea003050b105583104a583768edc85',
  1,
  1,
  1,
  0,
  0.0,
  0.0]]

## Data Dictionary
0 - user_id  
1 - total number of sessions  
2 - total number of events  
3 - median number of events  
4 - total sum of purchases  
5 - total amount of purchases  
6 - median session duration

In [25]:
events_data_train_adj7 = events_data_train_adj6.map(lambda x: ','.join([str(y) for y in x]))
events_data_train_adj7.take(1)

['c7385007b6f158e913411d1563d1f5ef90ea003050b105583104a583768edc85,1,1,1,0,0.0,0.0']

In [26]:
with open('train_last_7_days.csv', 'w') as f:
    for line in events_data_train_adj7.collect():
        f.write(line)
        f.write('\n')

## Test

In [27]:
events_data_test = events_data_adj.filter(lambda x: convertTime(x[2])[:10] < '2018-12-16')\
                                  .filter(lambda x: convertTime(x[2])[:10] >= '2018-12-09')
events_data_test.take(1)

[['7658550338340861062',
  '0',
  '1544694242367',
  '0.0',
  '207852210fbcc1e958c85f1fa0729b3acf67c1ae26c775e98c556aeafdb7bda5']]

In [28]:
temp_pair_test_dict = {}
for i in range(6):
    if i != 0:
        temp_pair = events_data_test.map(lambda x: (x[0], x[i]))\
                                    .groupByKey().map(lambda x: [x[0], list(x[1])])
        temp_pair_test_dict[i] = temp_pair

In [29]:
events_data_test_adj = temp_pair_test_dict[1]
events_data_test_adj = events_data_test_adj.leftOuterJoin(temp_pair_test_dict[2])\
                                           .map(lambda x: (x[0], list(x[1])))
events_data_test_adj = events_data_test_adj.leftOuterJoin(temp_pair_test_dict[3])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][1]]))
events_data_test_adj = events_data_test_adj.leftOuterJoin(temp_pair_test_dict[4])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][0][2], x[1][1]]))
events_data_test_adj.take(1)

[('8551675621918410413',
  [['45'],
   ['1544393498323'],
   ['0.0'],
   ['9bf30b7fd7412fbd1fb15c39df1a20094eb66431b0d0eab74d0731b6585b8dd4']])]

In [30]:
events_data_test_adj2 = events_data_test_adj.map(lambda x: [x[0], x[1][0], x[1][1], x[1][2], 
                                                            most_common(x[1][3])])
events_data_test_adj2.take(1)

[['8551675621918410413',
  ['45'],
  ['1544393498323'],
  ['0.0'],
  '9bf30b7fd7412fbd1fb15c39df1a20094eb66431b0d0eab74d0731b6585b8dd4']]

In [31]:
events_data_test_adj3 = events_data_test_adj2.map(lambda x: [x[0], findPur(x[1]), len(x[1]), x[2], x[3], x[4]])
events_data_test_adj3.filter(lambda x: x[1]!=[]).take(1)

[['6944784711615725873',
  [8, 15, 25, 35],
  62,
  ['1544758674714',
   '1544758684218',
   '1544758703123',
   '1544758732303',
   '1544758739234',
   '1544758755806',
   '1544758758254',
   '1544758799635',
   '1544758848297',
   '1544758855624',
   '1544758855626',
   '1544758864411',
   '1544758992671',
   '1544759133298',
   '1544759261141',
   '1544759470196',
   '1544759479774',
   '1544759479776',
   '1544759504205',
   '1544759512826',
   '1544759559523',
   '1544759607659',
   '1544759751487',
   '1544759823357',
   '1544759989737',
   '1544760149329',
   '1544760157431',
   '1544760157434',
   '1544760251063',
   '1544760345086',
   '1544760429726',
   '1544760566156',
   '1544760587477',
   '1544760626830',
   '1544760844476',
   '1544760878331',
   '1544760892324',
   '1544760894547',
   '1544760955185',
   '1544761004770',
   '1544761056259',
   '1544761096543',
   '1544761151414',
   '1544761151417',
   '1544761215436',
   '1544761253578',
   '1544761329851',
   '154476

In [32]:
events_data_test_adj4 = events_data_test_adj3.map(lambda x: [x[5], x[0], x[2], len(x[1]),
                                                               float(max(x[3]))-float(min(x[3])), 
                                                               moneySum(x[4], x[1])])
events_data_test_adj4.filter(lambda x: x[3]>0).take(1)

[['d40afe9bca803dcb81afa7d1e6170304e340718976c5821913b38aa1a143bbc9',
  '6944784711615725873',
  62,
  4,
  3568495.0,
  18.172]]

In [33]:
temp_pair_test_dict_2 = {}
for i in range(6):
    if i != 0:
        temp_pair = events_data_test_adj4.map(lambda x: (x[0], x[i]))\
                                    .groupByKey().map(lambda x: [x[0], list(x[1])])
        temp_pair_test_dict_2[i] = temp_pair

In [34]:
events_data_test_adj5 = temp_pair_test_dict_2[1]
events_data_test_adj5 = events_data_test_adj5.leftOuterJoin(temp_pair_test_dict_2[2])\
                                             .map(lambda x: (x[0], list(x[1])))
events_data_test_adj5 = events_data_test_adj5.leftOuterJoin(temp_pair_test_dict_2[3])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][1]]))
events_data_test_adj5 = events_data_test_adj5.leftOuterJoin(temp_pair_test_dict_2[4])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][0][2], x[1][1]]))
events_data_test_adj5 = events_data_test_adj5.leftOuterJoin(temp_pair_test_dict_2[5])\
                                             .map(lambda x: (x[0], list(x[1])))\
                                             .map(lambda x: (x[0], [x[1][0][0], x[1][0][1], x[1][0][2], 
                                                                    x[1][0][3], x[1][1]]))
events_data_test_adj5.take(1)

[('499ee86372fde21cbab510ac83d2a74a81bdd664ee98837fdeb910e5ad35c799',
  [['7102252389912184790'], [27], [0], [77928.0], [0.0]])]

In [35]:
events_data_test_adj6 = events_data_test_adj5.map(lambda x: [x[0], 
                                                               len(x[1][0]),
                                                               sum(x[1][1]),
                                                               getMedian(x[1][1]),
                                                               sum(x[1][2]), 
                                                               sum(x[1][4]),
                                                               getMedian(x[1][3])])
events_data_test_adj6.take(1)

[['499ee86372fde21cbab510ac83d2a74a81bdd664ee98837fdeb910e5ad35c799',
  1,
  27,
  27,
  0,
  0.0,
  77928.0]]

In [36]:
events_data_test_adj7 = events_data_test_adj6.map(lambda x: ','.join([str(y) for y in x]))
events_data_test_adj7.take(1)

['499ee86372fde21cbab510ac83d2a74a81bdd664ee98837fdeb910e5ad35c799,1,27,27,0,0.0,77928.0']

In [37]:
with open('test_last_7_days.csv', 'w') as f:
    for line in events_data_test_adj7.collect():
        f.write(line)
        f.write('\n')

In [38]:
print(events_data_train_adj7.count())
print(events_data_test_adj7.count())

120036
42806


In [39]:
sc.stop()