In [1]:
from pyspark import SparkConf, SparkContext
import numpy as np
import sys
import random
import pyspark

In [2]:
class DecisionTree:
    def __init__(self, training_data, field_types):
        self.training_data = np.array(training_data)
        self.field_types = field_types

    def mean_squared_error(self, records):
        if records.shape[0] == 0:
            return 0
        targets = records[:, -1].astype('float')
        value = np.mean(targets)
        mse = np.mean(np.square(targets - value))
        return mse

    def find_best_attr(self, records):
        result = {'attr': None, 'split_cond': None, 'splits': None}
        min_mse = -1
        for i in np.arange(0, records.shape[1] - 1):
            split_cond = None
            splits = {}
            if self.field_types[i] == 'N':
                # numerical attribute
                left_split = list()
                right_split = list()
                split_cond = np.mean(records[:, i].astype('float')) # mean value of attribute i
                for record in records:
                    if float(record[i]) < split_cond:
                        left_split.append(record)
                    else:
                        right_split.append(record)
                splits['left'] = np.array(left_split)
                splits['right'] = np.array(right_split)
            else:
                # categorical attribute
                split_cond = list(set(records[:, i]))
                splits = {cond:list() for cond in split_cond}
                for record in records:
                    splits[record[i]].append(record)
                splits = {k: np.array(v) for k,v in splits.items()}

            # calculate MSE of splits
            error = 0
            for cond in splits:
                split = splits[cond]
                error += (split.shape[0]/records.shape[0])*self.mean_squared_error(split)
            
            if min_mse == -1 or error < min_mse:
                result['attr'] = i # index of chosen attribute for splitting
                result['split_cond'] = split_cond
                result['splits'] = splits
                min_mse = error
        
        return result

    def split(self, node):
        # split the records in a node, apply recursively to child nodes
        splits = node['splits']
        min_record = 5 # if the number of record in a split < min_record, we make a leaf node
        for i in splits:
            split = splits[i]
            if split.shape[0] <= min_record:
                node[i] = np.mean(split[:, -1].astype('float')) # make a leaf node
            else:
                node[i] = self.find_best_attr(split) # make an internal node
                self.split(node[i]) # split recursively on the child node

    def build_model(self):
        root_node = self.find_best_attr(self.training_data)
        self.split(root_node)
        return root_node
        
    def apply_model(self, node, record):
        if self.field_types[node['attr']] == 'N':
            # numerical node
            if float(record[node['attr']]) < node['split_cond']:
                if isinstance(node['left'], dict):
                    return self.apply_model(node['left'], record)
                else:
                    return node['left'] # leaf node
            else:
                if isinstance(node['right'], dict):
                    return self.apply_model(node['right'], record)
                else:
                    return node['right'] # leaf node
        else:
            # categorical node
            cat = record[node['attr']]
            if cat not in node['split_cond'] and len(node['split_cond']) > 0:
                # not equal to any categorical values, set to the first category as default
                cat = node['split_cond'][0]
            
            if isinstance(node[cat], dict):
                return self.apply_model(node[cat], record)
            else:
                return node[cat] # leaf node

    def predict(self, model, test_data):
        predictions = []
        for record in test_data:
            pred_val = self.apply_model(model, record)
            predictions.append([float(record[-1]), pred_val])
        return predictions

In [3]:
sc = pyspark.SparkContext.getOrCreate()

In [4]:
predict_on = 'departure'
maxSplitNumber = 1000
sampleSize = 4000
sampleNumber = 3
field_types = ['C', 'N', 'C', 'C', 'C', 'C', 'C', 'C', 'N', 'N', 'N', 'N', 'N', 'N']

In [5]:
training_rdd = sc.textFile("/mnt/wiktorskit-jungwonseo-ns0000k/home/notebook/group03/Predict-Delay/Dataset/cleaned_train_whole.txt")

In [6]:
test_rdd = sc.textFile("/mnt/wiktorskit-jungwonseo-ns0000k/home/notebook/group03/Predict-Delay/Dataset/cleaned_test_2000.txt")

In [7]:
def removeNonPredict(line, predict_on):
    if predict_on == 'departure':
        del line[-1]
    else:
        del line[-2]
    return line

In [8]:
def reduceSplit(x, y):
    x.extend(y)
    return x

In [9]:
def samplingData(key, values):
    values_list = np.array(values)
    l = values_list.shape[0]
    if l < sampleSize:
        return [(key, values)]
    else:
        # sampling without replacement
        pairs = []
        for i in range(0, sampleNumber):
            idx = np.random.choice(l, size=sampleSize, replace=False)
            pairs.append(("{}_{}".format(key, i), values_list[idx, :].tolist()))
        return pairs
    

In [10]:
def predict(training_set, test_set):
    DT = DecisionTree(training_set, field_types)
    model = DT.build_model()
    predictions = DT.predict(model, test_set)
    return predictions

In [11]:
test_set = np.array(test_rdd
                    .map(lambda line: line.replace("[","").replace("]","").replace("'","").split(','))
                    .map(lambda line: removeNonPredict(line, predict_on))
                    .collect())

In [12]:
import time

In [13]:
elapsed_time = []
for i in range(0,3):
    time_start=time.time()
    print("time start",time_start)

    result = (training_rdd
            .map(lambda line: line.replace("[","").replace("]","").replace("'","").split(','))
            .map(lambda line: (random.randint(0, maxSplitNumber), [removeNonPredict(line, predict_on)]))
            .reduceByKey(reduceSplit)
            .flatMap(lambda record: samplingData(record[0], record[1]))
            .map(lambda training_set: (1, predict(training_set[1], test_set)))
            .reduce(lambda x, y: (x[0] + y[0], np.add(x[1], y[1]))))

    time_end=time.time()
    print("Elapsed time:", time_end - time_start)
    elapsed_time.append(time_end - time_start)

time start 1555268264.282026
Elapsed time: 111.44248723983765
time start 1555268375.7246552
Elapsed time: 110.4754331111908
time start 1555268486.200298
Elapsed time: 106.6352527141571


In [14]:
print(np.mean(elapsed_time))

109.51772435506184


In [15]:
final_predictions = result[1]/result[0]
final_predictions

array([[-5.        , 10.03111333],
       [ 5.        ,  8.25696526],
       [22.        ,  6.34568765],
       ...,
       [-6.        ,  8.04308469],
       [-2.        ,  8.66230991],
       [62.        , 10.63738484]])

In [16]:
rmse = np.sqrt(np.mean(np.sum(np.square(final_predictions),axis=1)))
print(rmse)

30.051454747262607


In [17]:
sc.stop()