In [1]:
import findspark
findspark.init()
import pyspark
from random import random
from pyspark import SparkContext
import numpy as np
from math import sqrt
import re
import datetime
import csv

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
config = pyspark.SparkConf().set('spark.executor.memory', '8G').set('spark.cores.max', '32')\
            .set('spark.port.maxRetries','200').set("spark.driver.maxResultSize", "3g")
sc = pyspark.SparkContext(appName="Project #2", master='spark://polyp1:7077',conf = config )

In [3]:
input_data = sc.textFile('activity_log.csv')
train_label = sc.textFile('train_label.csv')
enrollment_id = sc.textFile('enrollment_list.csv')
sample_submission = sc.textFile('sample_submission.csv')

# Data Pre-processing

In [4]:
# split the input data and filter out first line
# convert the datetime type

data_raw = input_data.filter(lambda e: e[0] != 'e').map(lambda e: e.split(',')).map(
    lambda e: [e[0], datetime.datetime.strptime(e[1], "%Y-%m-%dT%H:%M:%S"), e[2] ] )
                                                                                
print data_raw.take(1), data_raw.count()

[[u'1', datetime.datetime(2014, 5, 31, 12, 43, 20), u'navigate']] 8157277


In [5]:
# change the format of other files

train_data = train_label.filter(lambda e: e[0] != 'e').map(lambda e: e.split(',')).map(lambda e: (e[0], float(e[1])) )
enrollment_data = enrollment_id.filter(lambda e: e[0] != 'e').map(lambda e: e.split(','))
submission_data = sample_submission.filter(lambda e: e[0] != 'e').map(lambda e: e.split(','))

print train_data.take(1)
print enrollment_data.take(1)
print submission_data.take(1)

[(u'1', 1.0)]
[[u'1', u'Mv7P1v8fRDifebDlgIi2Vl1Z', u'ev4oSPQOjeL5QAf5oF72ooYu']]
[[u'72326', u'0.89875500179']]


In [6]:
# convert all activities to a dictionary value
# check if there are only seven kinds of value in activity.

dic_activity = {'access':'A', 'discussion':'B', 'navigate':'C', 'page_close':'D', 
                'problem':'E', 'video':'F', 'wiki':'G' }
dic_list = ['A','B','C','D','E','F','G']

data_type = data_raw.map(lambda e: [ e[0], e[1], dic_activity[e[2]] ] )
print data_type.take(1)

[[u'1', datetime.datetime(2014, 5, 31, 12, 43, 20), 'C']]


### There are only 7 activities --> it is clean, and can apply dictionary to map them   
### Some id contains records of different days, 3944 in total, should be carefully designed for these data

# Feature Engineering

In [7]:
""" create the frequency for each activity in each enrollment_id --> 7 features created """
# the head names are in same order of the values

def freq_id(e):
    dic_freq = {'A':0, 'B':0, 'C':0, 'D':0, 'E':0, 'F':0, 'G':0}
    key_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G']
    value_list = []
    for x in e[1]:
        dic_freq[x] += 1
    for i in range(7):
        value_list.append(dic_freq[key_list[i]])
    return [e[0], value_list]

data_freq = data_type.map(lambda e: [e[0], e[2]]).groupByKey().map(lambda e: [ e[0], [x for x in e[1]] ] ).map(freq_id)
data_freq_head = sorted(dic_activity.keys())

print data_freq.take(1), data_freq.count()
print data_freq_head

[[u'110557', [311, 55, 57, 124, 62, 3, 3]]] 120542
['access', 'discussion', 'navigate', 'page_close', 'problem', 'video', 'wiki']


In [8]:
"""how many days in an enrollment_id --> 1 feature created"""

data_day = data_type.map(
    lambda e: [(e[0] + ',' + str(e[1].year) + ',' + str(e[1].timetuple().tm_yday)), 1 ] ).groupByKey().map(
    lambda e: e[0] ).map(lambda e: e.split(',')).map(lambda e: (e[0], 1) ).reduceByKey(lambda x,y: x+y)
data_day_head = ['number of days']

print data_day.take(1), data_day.count()

[(u'110557', 8)] 120542


In [9]:
""" Time lag in a day in seconds, for those who have multiple day records, take average  --> 1 feature created """

data_timeLag = data_type.map(
    lambda e: [(e[0] + ',' + str(e[1].year) + ',' + str(e[1].timetuple().tm_yday)), e[1] ] ).groupByKey().map(
    lambda e: [ e[0], [x for x in e[1]] ] ).map(
    lambda e: [ e[0], (max(e[1]) - min(e[1])).total_seconds() ] ).map(
    lambda e: [ e[0].split(','), e[1]] ).map(
    lambda e: [ e[0][0], e[1]] ).groupByKey().map(
    lambda e: [ e[0], np.mean([x for x in e[1]]) ] )
data_timeLag_head = ['working time interval']

print data_timeLag.take(1), data_timeLag.count()

[[u'110557', 15508.25]] 120542


## a little harder feature generation --> activity pair pattern related

In [10]:
# Make sure that 'groupByKey' method can keep the pattern order

example = sc.parallelize([(0, u'D'), (0, u'A'), (0, u'E'), (0, u'D'),(1, u'E'), (2, u'F')])
a = example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print example.take(2)
print a

[(0, u'D'), (0, u'A')]
[(0, [u'E', u'D', u'D', u'A']), (1, [u'E']), (2, [u'F'])]


In [11]:
"""Pair pattern: Frequency for each two-activity pattern --> 49 features created"""

# create a pair-pattern dictionary for future mapping

list_act = ['A','B','C','D','E','F','G']
list_value = np.zeros(49)
list_key = []

for i in xrange(7):
    list_key.append(list_act[i] + list_act[i])
    for j in xrange(i+1,7):
        list_key.append(list_act[i] + list_act[j])
        list_key.append(list_act[j] + list_act[i])
        
dic_pair_pattern = dict(zip(list_key, list(list_value)))  # create pattern dictionary outside to maintain the key order

def pair_pattern(e):
    for k in dic_pair_pattern.keys():   # initialize dictionary
        dic_pair_pattern[k] = 0
    for i in xrange(len(e[1])):         # calculation
        lab = e[1][i]
        for j in xrange(i+1,len(e[1])):
            label = lab + e[1][j]
            dic_pair_pattern[label] += 1
    value_list = []                     # create a value list to make sure that output is in same order as column names
    for m in xrange(49):
        value_list.append(dic_pair_pattern[list_key[m]])
    return [ e[0], value_list ] 

def pair_pattern_merge(e):              # add value from different day operations but belongs to same enrollment_id
    list_pair_pattern = []
    for i in xrange(49):
        freq = 0
        for value in e[1]:
            freq += value[i]
        list_pair_pattern.append(freq)
    return [ e[0], list_pair_pattern ]
            
data_pairPattern = data_type.map(
    lambda e: [(e[0] + ',' + str(e[1].year) + ',' + str(e[1].timetuple().tm_yday)), e[2] ] ).groupByKey().map(
    lambda e: [ e[0], [x for x in e[1]] ] ).map(pair_pattern).map(
    lambda e: [ e[0].split(','), e[1]] ).map(
    lambda e: [ e[0][0], e[1]] ).groupByKey().map(
    lambda e: [ e[0], [x for x in e[1]] ] ).map(pair_pattern_merge)

data_pairPattern_head = list_key        # create a list as column head to understand the new feature

print data_pairPattern.take(1), data_pairPattern.count()
print data_pairPattern_head

[[u'110557', [9362, 1014, 1456, 1398, 1461, 3830, 3649, 2499, 1098, 134, 58, 54, 105, 338, 462, 332, 765, 436, 266, 154, 7, 8, 40, 29, 463, 765, 633, 269, 199, 6, 7, 44, 43, 1493, 937, 418, 44, 20, 32, 64, 391, 28, 27, 7, 14, 1, 0, 0, 3]]] 120542
['AA', 'AB', 'BA', 'AC', 'CA', 'AD', 'DA', 'AE', 'EA', 'AF', 'FA', 'AG', 'GA', 'BB', 'BC', 'CB', 'BD', 'DB', 'BE', 'EB', 'BF', 'FB', 'BG', 'GB', 'CC', 'CD', 'DC', 'CE', 'EC', 'CF', 'FC', 'CG', 'GC', 'DD', 'DE', 'ED', 'DF', 'FD', 'DG', 'GD', 'EE', 'EF', 'FE', 'EG', 'GE', 'FF', 'FG', 'GF', 'GG']


In [12]:
""" Activity lag: how long an activity last in seconds --> 7 features """

# need to think about the 'page_close' activity --> maybe it does not make sense
# create a activity dictionary for future mapping

activity_key = ['A','B','C','D','E','F','G']
activity_value = np.zeros(7)
activity_lag = dict( zip(activity_key, list(activity_value)) )

def activity_time_lag(e):
    for key in activity_key:            # initialize dictionary
        activity_lag[key] = 0
    for i in xrange(0,len(e[1])-1):     # add time lag for each activity
        label = e[1][i][0]
        value = (e[1][i+1][1] - e[1][i][1]).total_seconds()
        activity_lag[label] += value
    alist = []
    for j in xrange(7):                 # created ordered time lag values for each activity
        alist.append(activity_lag[activity_key[j]])
    return [ e[0], alist ]

def activity_lag_merge(e):              # add value from different day operations but belongs to same enrollment_id
    act_time_lag = []
    for i in xrange(7):
        lag = 0
        for value in e[1]:
            lag += value[i]
        act_time_lag.append(lag)
    return [ e[0], act_time_lag ]

data_activity_lag = data_type.map(
    lambda e: [(e[0] + ',' + str(e[1].year) + ',' + str(e[1].timetuple().tm_yday)), (e[2], e[1])] ).groupByKey().map(
    lambda e: [ e[0], [x for x in e[1]] ] ).map(activity_time_lag).map(
    lambda e: [ e[0].split(','), e[1]] ).map(
    lambda e: [ e[0][0], e[1]] ).groupByKey().map(
    lambda e: [ e[0], [x for x in e[1]] ] ).map(activity_lag_merge)

data_activity_lag_head = activity_key        # create a list as column head to understand the new feature

print data_activity_lag.take(1), data_activity_lag.count()
print data_activity_lag_head

[[u'110557', [85873.0, 3293.0, 5991.0, 18715.0, 10053.0, 2.0, 139.0]]] 120542
['A', 'B', 'C', 'D', 'E', 'F', 'G']


## Join new features and train label

#### features need to be joined

data_day, data_day_head --> 1
data_timeLag, data_timeLag_head --> 1
data_freq, data_freq_head --> 7

data_pairPattern, data_pairPattern_head --> 49
data_activity_lag, data_activity_lag_head --> 7

#### course is used as an id for later calculation

In [13]:
# new rdd file with enrollment_id and all features

def break_list(e):
    alist = []
    for i in range(2):
        for j in xrange(len(e[1][i])):
            alist.append(e[1][i][j])
    return ( e[0], alist )

data_features = data_day.join(data_timeLag).join(data_freq).map(break_list).join(data_pairPattern).map(
    break_list).join(data_activity_lag).map(break_list)

feature_list = data_features.take(1)
number_features = len(feature_list[0][1])

print data_features.take(1), data_features.count(), number_features

[(u'74702', [4, 1218.25, 23, 0, 7, 14, 0, 5, 0, 90, 0, 0, 20, 42, 88, 56, 0, 0, 30, 35, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 27, 17, 0, 0, 10, 10, 0, 0, 46, 0, 0, 20, 30, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0, 53.0, 0, 86.0, 4734.0, 0, 0.0, 0])] 120542 65


In [14]:
# Do not create dummy for course, but map the course name to another dictionary

course_dictionary = {
    'SZagToBU00pLXq691xXO8bju': 'C1', 'AenHr7h6SmttABSZzLDIOOPg': 'C2', 'ATVfuvcgCpCeoJ35eYmFgTKM': 'C3', 
    'V6HKtzXgLxJKuspYS6oIf6TM': 'C4', 'DKmoqeihogHvweNuFRglX4gC': 'C5', '0amLAQWMHCsmYB0LbJgYyaQh': 'C6', 
    'm88du8j2zKLOgu0qvfDih64t': 'C7', 'MSJxWgbSMt16zUqycBbrpfm6': 'C8', 'IgPEMpQbX7H5OGTWUvmRmPIO': 'C9', 
    've31Ekh7zequ7hWZ90igqwDp':'C10', 'ev4oSPQOjeL5QAf5oF72ooYu':'C11', 'QR0XzCHCYDjlY08IzEL9Xoci':'C12', 
    'K6WjDciFRURqcBkA0O6jIAWI':'C13', '1A29SKaQQCQZabKDaq7Tk8Lo':'C14', '1VZLSdIQsjy9MTgBTQYE4QI2':'C15', 
    'meLIi3tsNI0i6fexqFOUCz1k':'C16', 'V0ltVkyedZiGeSpNdmwI3AGe':'C17', 'dMOkjBKrDhejoADxQd7zkGiQ':'C18', 
    'yu4cZXxuoaajHNIVZxVenLrR':'C19', 'ccVlU1h9PciWvbZjm1DvqjcL':'C20', '4y2oIXcbPEgXpmqElRNICJnU':'C21', 
    'OaSRaqvAQY9msGG5HOGGCxe8':'C22', 'il7yokxrygymsYcIeKmHztAS':'C23', 'oUmjmhtu3lTMYfUMe7EjTl5O':'C24', 
    'Z6ta6CAHyTVLze24SBoSufZk':'C25', 'ElWfvzJwWRCO8dwcFErfTWeJ':'C26', 'djBWhu0JoDsrQ2a6Kzg6B4E2':'C27', 
    '72ea9t9dsOcCDKJcFDdyFw7x':'C28', 'TRrZ9gGs6MrmfBbpM1B9hzbn':'C29', '4977pk0QkRxSMP841C5ZDSGk':'C30', 
    'nknoexvjeVdLxiDT0VODT9CV':'C31', '22sM5fCFEc6YqlavAZ5fQ9rs':'C32', '9PacRT9Ksez8qfqnr0B4rUZr':'C33', 
    'ctO4FKH4WwG74Jm4loSZHZT7':'C34', 'Cfz86nNsKVtSr4UmAlqazDyb':'C35', 'kxtZS4d61l2cEp0BZ3e6HzOH':'C36', 
    'BRYxq4ELEbiuSpUBWNPvla8e':'C37', 'T2KaDT5KTn3hGQbrO7nLrErU':'C38', 'WckhTTKCVMWHr8RQsf97UQnr':'C39'
}

course = enrollment_data.map(lambda e: (e[0], course_dictionary[e[2]]) )
course_head = ['course']

print course.take(1)

[(u'1', 'C11')]


""" not used temp """

##### check how many courses in the dataset --> create dummy variables for each course

enrollment_data_check = enrollment_data.map(lambda e: (e[2], 1)).reduceByKey(lambda x,y: x+y)
enrollment_data_list = enrollment_data_check.map(lambda e: e[0] ).collect()
index_list = [i for i in xrange(39)]
course_dict = dict(zip(enrollment_data_list,index_list))
dummylist = list(np.zeros(39))

def course_dummy(e):
    new_list = [int(x) for x in dummylist]
    new_list[course_dict[e[1]]] = 1
    return ( e[0], new_list )

course_dummy = enrollment_data.map(lambda e: ( e[0], e[2]) ).map(course_dummy)
course_dummy_head = course_dict.keys()

print course_head
print course.take(1)


##### join course with dummy variables to form whole dataset

data_whole = data_features.join(course).map(break_list)
data_head = data_day_head + data_timeLag_head + data_freq_head + data_dateTime_head + data_pairPattern_head + \
            data_timePair_mean_head + data_timePair_var_head + data_timePair_min_head + data_timePair_max_head + \
            course_head

In [25]:
# join course without dummy to form another whole dataset

def bre_course(e):
    alist = []
    for i in xrange(len(e[1][0])):
        alist.append(e[1][0][i])
    alist.append(e[1][1])
    return (e[0], alist)

data_whole = data_features.join(course).map(bre_course)
data_head = data_day_head + data_timeLag_head + data_freq_head + data_pairPattern_head + \
data_activity_lag_head + course_head

print data_whole.take(1), data_whole.count()
print data_head

[(u'103941', [3, 24060.0, 69, 7, 30, 25, 8, 10, 2, 1050, 21, 42, 285, 507, 413, 319, 55, 17, 182, 148, 17, 1, 21, 77, 7, 30, 12, 56, 0, 7, 7, 14, 0, 174, 185, 112, 24, 72, 90, 42, 24, 0, 116, 31, 17, 56, 59, 10, 2, 28, 8, 8, 16, 0, 22, 2, 2, 1, 7145.0, 876.0, 2545.0, 55453.0, 628.0, 0.0, 5533.0, 'C33'])] 120542
['number of days', 'working time interval', 'access', 'discussion', 'navigate', 'page_close', 'problem', 'video', 'wiki', 'AA', 'AB', 'BA', 'AC', 'CA', 'AD', 'DA', 'AE', 'EA', 'AF', 'FA', 'AG', 'GA', 'BB', 'BC', 'CB', 'BD', 'DB', 'BE', 'EB', 'BF', 'FB', 'BG', 'GB', 'CC', 'CD', 'DC', 'CE', 'EC', 'CF', 'FC', 'CG', 'GC', 'DD', 'DE', 'ED', 'DF', 'FD', 'DG', 'GD', 'EE', 'EF', 'FE', 'EG', 'GE', 'FF', 'FG', 'GF', 'GG', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'course']


In [26]:
# apply standard scaler to the whole dataset and return standardized features.

from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.linalg import Vectors

feature = data_features.map(lambda e: e[1])
label = data_features.map(lambda e: e[0])

scaler = StandardScaler(withMean=True, withStd=True).fit(feature)
data_scaler = label.zip(scaler.transform(feature))

output_data_scaler = data_scaler.map(lambda e: [ e[0], [x for x in e[1]] ] ).join(course).map(bre_course)
print output_data_scaler.take(1)

[(u'103941', [0.10642912981595662, 3.0740990183713741, 0.79350150026024258, 0.042495389606601174, 1.6668819227598972, 0.70655590433194115, -0.071569137197939184, 0.22919857345051339, 0.31907527446323569, 0.022536646502868431, -0.081162844842727996, -0.018994033091972236, 1.0685482233251613, 1.6508829646527452, 0.52458591462079762, 0.447315718184916, -0.14271973772716573, -0.1490579466063012, 0.16798880557577572, 0.14346997906146014, 0.13918819133375129, -0.096683045452527733, -0.018881577301375466, 0.22336202672639066, -0.057770024816175089, 0.036706344018757504, -0.057791744398984962, 0.090563170322711259, -0.11637023981870402, -0.038314095524840892, -0.080088068663310599, 0.040654877336135264, -0.025538854610072503, 1.1845698046133475, 1.5674027035629916, 1.1158914816535703, -0.059796528663507403, 0.39047827713181332, 1.1435862779128594, 0.5507285101214453, 0.51371359630982516, -0.075725088831687479, 0.2681908425557864, -0.09892528772953417, -0.12850489766144638, 0.10708545238645485,

In [28]:
# create training data and testing data --> training data with label, testing without label
# one combination of scaler, one without scaler

def merge_train(e):
    alist = []
    for value in e[0]:
        alist.append(value)
    alist.append(e[1])
    return alist

training_no_scale = data_whole.join(train_data).map(lambda e: e[1]).map(merge_train)
testing_no_scale = data_whole.join(submission_data).map(lambda e: e[1][0])

training_scale = output_data_scaler.join(train_data).map(lambda e: e[1]).map(merge_train)
testing_scale = output_data_scaler.join(submission_data).map(lambda e: e[1][0])

print training_no_scale.take(1), training_no_scale.count()
print testing_no_scale.take(1), testing_no_scale.count()

print training_scale.take(1), training_scale.count()
print testing_scale.take(1), testing_scale.count()

[[1, 143.0, 7, 6, 6, 4, 0, 2, 2, 21, 20, 22, 12, 30, 20, 8, 0, 0, 9, 5, 6, 8, 15, 11, 25, 15, 9, 0, 0, 6, 6, 4, 8, 15, 19, 5, 0, 0, 9, 3, 8, 4, 6, 0, 0, 5, 3, 2, 6, 0, 0, 0, 0, 0, 1, 1, 3, 1, 18.0, 73.0, 23.0, 26.0, 0, 1.0, 2.0, 'C6', 1.0]] 72325
[[1, 0.0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'C24']] 48217
[[-0.53600959755703792, -0.57200746056132679, -0.34580304350736379, 0.016163473735632355, -0.1829045663608283, -0.30070666135160101, -0.30407853803784241, -0.31191525238144718, 0.31907527446323569, -0.017899963960570756, -0.08254167959590801, -0.049394170342278693, -0.19227203075269006, -0.20193389436096226, -0.24736755587229362, -0.21687931953533707, -0.19280503604449134, -0.16918647665003184, -0.21036326108824013, -0.1920945069170861, -0.024795371555005663, 0.027906442333069555, -0.020256271976629228, -0.021625444581344452, -0.00074

In [29]:
# prepare for outputing the training and testing dataset

train_list = training_no_scale.collect()
test_list = testing_no_scale.collect()

train_scale_list = training_scale.collect()
test_scale_list = testing_scale.collect()

In [30]:
# output head

with open('round2_head.csv','wb') as out:
    csv_out2=csv.writer(out)
    csv_out2.writerow(data_head)
    for row in data_head:
        csv_out2.writerow(row)

In [31]:
# output no scale training data as csv file  

data_heading = data_head + ['target']

with open('round2_train_no_scale.csv','wb') as out:
    csv_out2=csv.writer(out)
    csv_out2.writerow(data_heading)
    for row in train_list:
        csv_out2.writerow(row)


# output no scale testing data as csv file 

with open('round2_test_no_scale.csv','wb') as out:
    csv_out2=csv.writer(out)
    csv_out2.writerow(data_head)
    for row in test_list:
        csv_out2.writerow(row)

In [32]:
# output with scale training data as csv file 

data_heading = data_head + ['target']

with open('round2_train_data.csv','wb') as out:
    csv_out2=csv.writer(out)
    csv_out2.writerow(data_heading)
    for row in train_scale_list:
        csv_out2.writerow(row)

        
# output with scale testing data as csv file

with open('round2_test_data.csv','wb') as out:
    csv_out2=csv.writer(out)
    csv_out2.writerow(data_head)
    for row in test_scale_list:
        csv_out2.writerow(row)

In [33]:
sc.stop()