In [1]:
import math
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
import copy

In [2]:
def cov(x, y):
    #definition of cov(x, y) = E[xy] - E[x]E[y]
    return np.mean(x * y) - np.mean(x) * np.mean(y)

def corr(x, y):
    #corr(x, y) = cov(x, y) / sqrt(var(x) * var(y))
    #var(y) == 0 --> this edge case is checked in the decision tree algorithm
    #var(y) == 0 --> implies very high correlation in that set of data
    #var(x) == 0 --> implies very low correlation in that set of data (hence we return 0 on this)
    if np.var(x) == 0:
        return 0
    return cov(x, y) / math.sqrt(np.var(x) * np.var(y))

def generate_data(size, d):
    x = []
    x.append(np.random.normal(3, 1, size))
    x.append(np.random.normal(-2, 1, size))
    x.append(x[0] + 2 * x[1])
    x.append((x[1] + 2)**2)
    x.append(np.random.binomial(n=1, p=0.8, size=size))
    for _ in range(d - 5):
        x.append(np.random.normal(0, 0.1, size))

    def compute_y(x):
        y = 4 - 3 * x[0] * x[0] + x[2] - 0.01 * x[3] + x[1] * x[4] + np.random.normal(0, 0.1, len(x[0]))
        return y
    
    def transpose(temp):
        temp = np.array(temp)
        return temp.T
    
    xt = transpose(x)
    y = compute_y(x)
    
    return x, xt, y

In [3]:
class DecisionTree():   
    superfluous = defaultdict(int)
    def __init__(self, x, y, d, max_depth, min_sample_size):
        self.x = x
        self.y = y
        
        self.max_depth = max_depth
        self.min_sample_size = min_sample_size
        self.depth = d 
        self.child = len(self.x[0]) <= self.min_sample_size or self.depth == self.max_depth or np.var(y) == 0
        
        self.ind = -1
        self.threshold = 0
        
        self.result = np.mean(y)
        self.left, self.right = None, None

        if not self.child:
            self.split()
    
    def find_best_feature(self):
        all_corr = [abs(corr(xi, self.y)) for xi in self.x]
        return np.argmax(all_corr)
    
    def find_threshold_split(self, ind):
        indices = self.x[ind].argsort()
        
        for i in range(len(self.x)):
            self.x[i] = self.x[i][indices]
        self.y = self.y[indices]
        
        threshold = -1
        mn = float('inf')

        for i in range(len(self.y) - 1):
            fltr_left = self.x[ind] <= self.x[ind][i]
            fltr_right = self.x[ind] > self.x[ind][i]
            
            var_left = np.var(self.y[fltr_left])
            var_right = np.var(self.y[fltr_right])
            
            left = np.sum(fltr_left)
            right = np.sum(fltr_right)

            err_left = left / len(self.y) * var_left
            err_right = right / len(self.y) * var_right
            
            err = err_left + err_right
            
            if err < mn:
                threshold, mn = (self.x[ind][i] + self.x[ind][i + 1]) / 2, err
        return threshold
    
     
    def split(self):
        self.ind = self.find_best_feature()
        self.superfluous[self.ind] += 1

        self.threshold = self.find_threshold_split(self.ind)

        fltr_left = self.x[self.ind] <= self.threshold
        fltr_right = self.x[self.ind] > self.threshold
                
        x_left = [arr[fltr_left] for arr in self.x]
        x_right = [arr[fltr_right] for arr in self.x]
            
        y_left = self.y[fltr_left]
        y_right = self.y[fltr_right]
        
        self.left = DecisionTree(x_left, y_left, self.depth + 1, self.max_depth, self.min_sample_size)
        self.right = DecisionTree(x_right, y_right, self.depth + 1, self.max_depth, self.min_sample_size)

    @staticmethod
    def predict(node, arr):
        if node.child:
            return node.result
        if arr[node.ind] <= node.threshold:
            return DecisionTree.predict(node.left, arr)
        else:
            return DecisionTree.predict(node.right, arr)

In [4]:
def compute_mse_dt(x, y, dt):
    err = 0 #error
    for i in range(len(y)):
        yp = DecisionTree.predict(dt, x[i]) #get prediction from decision tree
        err += (abs(yp - y[i]) ** 2) #add squared error
    err = err / len(y) #take mean of squared error
    return err

In [9]:
TRAIN_SIZE = 10000
TEST_SIZE = 1000
D = 10
def question1():
    x, xt, y = generate_data(TRAIN_SIZE, D)
    _, tx, testy = generate_data(TEST_SIZE, D)
    #constant model = the mean of y values (explained in pdf)
    optimal_c = np.mean(y)
    
    #function that computes the MSE of a given constant model
    def compute_mse_constant_model(y, c):
        err = 0
        for i in range(len(y)):
            err += (c - y[i]) ** 2
        err = err / len(y) #take average of squared sum
        return err
    
    #Find the errors of both training and testing
    training_error = compute_mse_constant_model(y, optimal_c)
    testing_error = compute_mse_constant_model(testy, optimal_c)
    
    #Print errors
    print('Training Error: ', training_error)
    print('Testing Error: ', testing_error)
  
    return training_error, testing_error

In [18]:
question1()

Training Error:  321.99924730793754
Testing Error:  287.265332497151


(321.99924730793754, 287.265332497151)