In [3]:
''' ############## Part 2 ############## '''

########### initialize and get data ############

from pyspark import SparkContext, SparkConf
from datetime import datetime
import numpy as np


def create_rdd_from_file(fn):
    '''
    Input:  fn- full path of the source file
    Output: the according RDD
    '''
    header_tokens = {u'"participantID.A"', u'"user"', u'"participantID"'}
    rdd = sc.textFile(fn)
    lines = rdd.map(lambda line: (line.split(",")))
    if 'gender' in fn: 
        lines = lines.map(lambda line: (''.join(['"',line[0],'"']),1 if line[1] == u'F' else 0))
        
    # drop first line #
    lines = lines.filter(lambda line: line[0] not in header_tokens)
    return lines

############ average number of occurences per day ############
def avg_num_of_occurences_per_day(rdd,id_idx,date_idx):
    '''
    Input:  
        rdd- the data in RDD container
        id_idx- the index of the ID field in rdd
        date_idx- the index of the date field in rdd
    Output: id-date RDD
    '''
    return rdd.map(lambda line: (line[id_idx],[{datetime.strptime(line[date_idx],'%Y-%m-%d %H:%M:%S').date()},1.0,1]) ) \
                   .reduceByKey(lambda v1,v2: [v1[0].union(v2[0]),v1[1]+v2[1],(v1[1]+v2[1])/len(v1[0].union(v2[0]))]) \
                    .map(lambda line: (line[0],line[1][-1]) )

############ average of distinct contacts per day ############
def avg_num_of_distinct_contacts(rdd,id_idx,date_idx,contact_idx):
    '''
    Input:  
        rdd- the data in RDD container
        id_idx- the index of the ID field in rdd
        date_idx- the index of the date field in rdd
        contact_idx- the index of the contact field (participantB/Mac address/hashed phone number) in rdd
    Output: id-averaged_number_of_distict_contacts RDD
    '''
    return rdd.map(lambda line: (line[id_idx],\
                                 [{datetime.strptime(line[date_idx],'%Y-%m-%d %H:%M:%S').date()},\
                                  {line[contact_idx]},\
                                  1.0 ]   ) )\
                    .reduceByKey(lambda v1,v2: [v1[0].union(v2[0]),\
                                                v1[1].union(v2[1]),\
                                                1.0*len(v1[1].union(v2[1]))/len(v1[0].union(v2[0]) )]) \
                    .map(lambda line: (line[0],line[1][-1]) )

############ portion of incoming calls ############
def portion_of_incoming(rdd,id_idx,direction_idx):
    '''
    Input:  
        rdd- the data in RDD container
        id_idx- the index of the ID field in rdd
        direction_idx- the index of the type field (incoming/outgoing) in rdd
    Output: id-proportion_of_incoming_events RDD
    '''
    return rdd.map(lambda line: ((line[id_idx],line[direction_idx]),1) ) \
                    .reduceByKey(lambda v1,v2: v1+v2) \
                    .map(lambda line: (line[0][0],[line[1],float(line[1]) if line[0][1] in {u'"incoming+"',u'"incoming"'} else 0.0]) ) \
                    .reduceByKey(lambda v1,v2: [v1[0]+v2[0],(v1[1]+v2[1])/(v1[0]+v2[0])]) \
                    .map(lambda line: (line[0],line[1][-1]) )

############ export to file ############
def export_list_to_file(l,fn,features_names=None):
    '''
    Input:  
        l- a list of lists which is required to export
        fn- the full path of the output file
        features_names- a list of strings represents the features' name for the header
    Output: nothing
    '''    
    import csv
    with open(fn, 'wb') as csvfile:
        f = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
        if features_names: f.writerow(['id']+features_names)
        for i in l:
            f.writerow(i)
    return

############ extract features from the source files and export them to a file ############   
def feature_engineering_phase(sms_fn,calls_fn,meetings_fn,gender_fn,output_fn):
    '''
    Input:  
        sms_fn- the full path of the SMSs source file
        calls_fn- the full path of the calls source file
        meetings_fn- the full path of the bluetooth source file
        gender- the full path of the gender source file
        output_fn- the full path of the output file
    Output: a list of the features names
    '''    
    # loading the source files:
    sms = create_rdd_from_file(sms_fn)
    calls = create_rdd_from_file(calls_fn)
    meetings = create_rdd_from_file(meetings_fn)
    gender = create_rdd_from_file(gender_fn)
    # extracting the average number of events per day:
    average_number_of_sms_per_day = avg_num_of_occurences_per_day(sms,0,2)
    average_number_of_calls_per_day = avg_num_of_occurences_per_day(calls,0,2)
    average_number_of_meetings_per_day = avg_num_of_occurences_per_day(meetings,0,1)
    # extracting the number of distinct contacts per day:
    average_distinct_calls_contacts_per_day = avg_num_of_distinct_contacts(calls,0,2,5)
    average_distinct_sms_contacts_per_day = avg_num_of_distinct_contacts(sms,0,2,4)
    average_distinct_meetings_contacts_per_day = avg_num_of_distinct_contacts(meetings,0,1,3)
    # extracting the proportion of incoming events:
    portion_of_incoming_calls = portion_of_incoming(calls,0,3)
    portion_of_incoming_sms = portion_of_incoming(sms,0,3)

    ############ average duration of calls ############
    total_average_duration = calls.filter(lambda line: line[4] != u'').map(lambda line: (1,[int(line[4]),1.0]) ) \
                       .reduceByKey(lambda v1,v2: [(v1[0]*v1[1]+v2[0]*v2[1])/(v1[1]+v2[1]),v1[1]+v2[1]]).collect()
    total_average_duration = total_average_duration[0][1][0]

    average_duration = calls.map(lambda line: (line[0],[int(line[4]) if line[4]!=u'' else total_average_duration,1.0]) ) \
                        .reduceByKey(lambda v1,v2: [(v1[0]*v1[1]+v2[0]*v2[1])/(v1[1]+v2[1]),v1[1]+v2[1]]) \
                        .map(lambda line: (line[0],line[1][0]) )     


    print "Finished creating the RDDs"

    ############ join all the features ############
    '''
    Features:
    1. average_number_of_sms_per_day
    2. average_number_of_calls_per_day
    3. average_number_of_meetings_per_day
    4. average_distinct_sms_contacts_per_day
    5. average_distinct_calls_contacts_per_day
    6. average_distinct_meetings_contacts_per_day
    7. portion_of_incoming_calls
    8. portion_of_incoming_sms
    9. average_duration
    '''
    features_names = ['average_number_of_sms_per_day', 'average_number_of_calls_per_day', 'average_number_of_meetings_per_day',\
                     'average_distinct_sms_contacts_per_day', 'average_distinct_calls_contacts_per_day', 'average_distinct_meetings_contacts_per_day',\
                     'portion_of_incoming_calls', 'portion_of_incoming_sms',\
                     'average_duration','gender']

    features = average_number_of_sms_per_day

    for i in features_names[1:]:
        features = features.join(eval(i))

    features = features.map(lambda line: [line[0],line[1][0][0][0][0][0][0][0][0][0]\
                                          , line[1][0][0][0][0][0][0][0][0][1]\
                                          , line[1][0][0][0][0][0][0][0][1]\
                                          , line[1][0][0][0][0][0][0][1]\
                                          , line[1][0][0][0][0][0][1]\
                                          , line[1][0][0][0][0][1]\
                                          , line[1][0][0][0][1]\
                                          , line[1][0][0][1]\
                                          , line[1][0][1]\
                                          , line[1][1]] )

    print "Finished joinning RDDs"

    ############ normalize each feature to 0-1 scale ############

    minimum = features.map(lambda line: (1, line[1:]) ) \
              .reduceByKey(lambda v1,v2: [min(v1[i],v2[i]) for i in range(len(v1))]).collect()[0][1]

    maximum = features.map(lambda line: (1, line[1:]) ) \
              .reduceByKey(lambda v1,v2: [max(v1[i],v2[i]) for i in range(len(v1))]).collect()[0][1]

    norm_features = features.map(lambda line: [line[0]]+[(1.0*line[i+1]-minimum[i])/(maximum[i]-minimum[i]) \
                                                         for i in range(len(maximum))])
    print "Finished normalizing"

    export_list_to_file(norm_features.collect(), output_fn, features_names)
    return features_names

# Creates features_initial.csv, which contains the extracted features of the participants and their label.
sms_fn = '/assignment/input/SMSLog.csv'
calls_fn = '/assignment/input/CallLog.csv'
meetings_fn = '/assignment/input/BluetoothProximity.csv'
gender_fn = '/assignment/input/gender_labels.csv'
output_fn = '/assignment/features/features_initial.csv'
features_names = feature_engineering_phase(sms_fn,calls_fn,meetings_fn,gender_fn,output_fn)



############ import file and write to train and test ############   
def split_file_to_train_test(source_fn,train_fn,test_fn,features_names=None):
    '''
    Input:  
        source_fn- the full path of the source file
        train_fn- the full path of the train output file
        test_fn- the full path of the test output file
        features_names- a list of strings represent the features' name, which returned from feature_engineering_phase()
    Output: nothing
    '''    
    file = np.genfromtxt(source_fn, delimiter=",", dtype= None)
    file = file[1:]   # remove the header
    np.random.shuffle(file)
    test, train = file[:(file.shape[0]/3)], file[(file.shape[0]/3):]
    
    export_list_to_file(test,test_fn,features_names)
    export_list_to_file(train,train_fn,features_names)
    return

test_fn = '/assignment/features/testing_initial.csv'
train_fn = '/assignment/features/training_initial.csv'

# split the the data to train and test sets:
split_file_to_train_test(output_fn,train_fn,test_fn,features_names)

print "END"

Finished creating the RDDs
Finished joinning RDDs
Finished normalizing
END


2015-07-02 04:48:36,477 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(246950) called with curMem=2068485, maxMem=278302556
2015-07-02 04:48:36,478 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_37 stored as values in memory (estimated size 241.2 KB, free 263.2 MB)
2015-07-02 04:48:36,497 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(19465) called with curMem=2315435, maxMem=278302556
2015-07-02 04:48:36,497 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_37_piece0 stored as bytes in memory (estimated size 19.0 KB, free 263.2 MB)
2015-07-02 04:48:36,498 INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_37_piece0 in memory on localhost:40918 (size: 19.0 KB, free: 265.1 MB)
2015-07-02 04:48:36,498 INFO  [Thread-2] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Upda

In [5]:
''' ############## Parts 3,4 and 5 ############## '''

############ Weighted KNN ############  

import numpy as np

def knn_reducer(v1,v2,k):
    '''
    Input:  
        v1- list of tuples of Euclidean distance and label
        v2- list of tuples of Euclidean distance and label
        k- the required number of nearest neighbors
    Output: list of sorted tuples of Euclidean distance and label (maximal length of k)
    '''    
    joined = v1+v2
    joined = sorted(joined)
    if len(joined) <= k:
        return joined
    else:
        return joined[:k]
    

def knn(train_rdd,pred_line,k):
    '''
    Input:  
        train_rdd- the RDD of the training set
        pred_line- the record we wish to predict its label
        k- the required number of nearest neighbors
    Output: the predicted label of pred_line
    '''    
    nn = train_rdd.map(lambda line: (1,[(np.linalg.norm(np.asarray(line[1:-1])-np.asarray(pred_line[1:])),line[-1])]) ) \
                    .reduceByKey(lambda v1,v2: knn_reducer(v1,v2,k)).collect()
    
    decision = 0
    sum_w = 0
    weights = nn[0][1]

    for w in weights:
        sum_w += np.exp(-w[0])
    
    for w,l in weights:
        decision += (np.exp(-w)/sum_w)*l

    if decision > 0.5:
        return 1
    else:
        return 0

############ Train and test the model ############   

test = sc.textFile(test_fn).map(lambda line: (line.split(",")))\
                            .filter(lambda line: line[0] != 'id')\
                            .map(lambda line: [line[0]]+[float(i) for i in line[1:]]).collect()
train = sc.textFile(train_fn).map(lambda line: (line.split(",")))\
                                .filter(lambda line: line[0] != 'id')\
                                .map(lambda line: [line[0]]+[float(i) for i in line[1:]])




hit = 0.0
for idx,t in enumerate(test):
    test[idx] = [t[0]]+[t[-1]]+[knn(train,t[:-1],7)]
    if test[idx][-2] == test[idx][-1]:
        hit += 1
accuracy =  hit/len(test)

tested_init_fn = '/assignment/output/tested_initial.csv'
export_list_to_file(test,tested_init_fn,['label','prediction'])

accuracy_init_fn = '/assignment/output/accuracy_initial.txt'
export_list_to_file([['Model Accuracy'],['The weighted KNN model\'s accuracy is:'],[str(accuracy)]],accuracy_init_fn)
print 'END'



END


2015-07-02 05:21:36,080 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(246950) called with curMem=3810764, maxMem=278302556
2015-07-02 05:21:36,081 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_151 stored as values in memory (estimated size 241.2 KB, free 261.5 MB)
2015-07-02 05:21:36,098 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(19465) called with curMem=4057714, maxMem=278302556
2015-07-02 05:21:36,098 INFO  [Thread-2] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_151_piece0 stored as bytes in memory (estimated size 19.0 KB, free 261.5 MB)
2015-07-02 05:21:36,099 INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_151_piece0 in memory on localhost:40918 (size: 19.0 KB, free: 264.9 MB)
2015-07-02 05:21:36,099 INFO  [Thread-2] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - U