In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import pandas as pd

from sklearn.datasets import load_iris

In [2]:
# Load iris data.
data = load_iris()
dataset = {feature: data.data[:, i] for i, feature in enumerate(data.feature_names)}
dataset.update({'target': data.target})

df = pd.DataFrame(dataset)
X, y = df.iloc[:, :-1], df.iloc[:, -1]

In [3]:
def entropy(split_feature):
    _, counts = np.unique(split_feature, return_counts=True)
    prob = counts / split_feature.shape[0]
    return -np.sum(prob * np.log2(prob))

def gini(split_feature):
    _, counts = np.unique(split_feature, return_counts=True)
    prob = counts / split_feature.shape[0]
    return 1 - np.sum(prob ** 2)

def var(split_feature):
    return np.var(split_feature)

split_criterions = {'entropy': entropy, 'gini': gini, 'var': var}

In [4]:
class DecisionTree:
    
    def __init__(self, criterion, depth=0, min_samples_split=2, max_depth=5):
        
        self.criterion = criterion
        self.criterion_fn = split_criterions[criterion]
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.depth = depth
        self.score = np.inf
    
    def fit(self, X, y):
        
        n_samples, n_features = X.shape
        
        if self.depth == self.max_depth or n_samples < self.min_samples_split:
            # Most common label.
            self.prediction = np.mean(y)
            self.is_leaf = True
            return self
        
        self.find_best_split(X, y)
        
        mask = X[:, self.best_feature_split] < self.best_split_value
        n_left = np.sum(mask)
        n_right = n_samples - n_left
        
        if n_left < self.min_samples_split or n_right < self.min_samples_split:
            self.is_leaf = True
            self.prediction = np.mean(y)
            return self
        
        self.is_leaf = False
        self.create_trees(X, y, mask)

    def find_best_split(self, X, y):
        
        n_samples, n_features = X.shape
        best_feature_split, best_split_score = 0, np.inf
        
        for feature in range(n_features):
            values = X[:, feature]
            sorted_indices = np.argsort(values)
            sorted_values = values[sorted_indices]
            sorted_labels = y[sorted_indices]
            _, uniq_indices = np.unique(sorted_values, return_index=True)
            
            for idx in uniq_indices:
                left_y, right_y = sorted_labels[:idx], sorted_labels[idx:]
                left_split_val = self.criterion_fn(left_y)
                right_split_val = self.criterion_fn(right_y)
                current_idx_split_val = idx / n_samples * left_split_val + \
                                            (1 - idx / n_samples) * right_split_val 
                
                if current_idx_split_val < best_split_score:
                    best_feature_split = feature
                    best_split_value = sorted_values[idx]
                    best_split_score = current_idx_split_val
        
        self.best_feature_split = best_feature_split
        self.best_split_value = best_split_value
        self.score = best_split_score
    
    def create_trees(self, X, y, mask):
        left_X, left_y = X[mask, :], y[mask]
        right_X, right_y = X[~mask, :], y[~mask]
        
        self.left = DecisionTree(self.criterion, depth=self.depth+1, 
                                 min_samples_split=self.min_samples_split, 
                                 max_depth=self.max_depth)
        self.right = DecisionTree(self.criterion, depth=self.depth+1, 
                                  min_samples_split=self.min_samples_split,
                                  max_depth=self.max_depth)
        
        self.left.fit(left_X, left_y)
        self.right.fit(right_X, right_y)

    def predict_sample(self, sample):
        
        if self.is_leaf:
            return self.prediction
        elif sample[self.best_feature_split] < self.best_split_value:
            return self.left.predict_sample(sample)
        else:
            return self.right.predict_sample(sample)
        
    def predict(self, X):
        return np.array([self.predict_sample(sample) for sample in X])

class DecisionTreeClassifier:
    
    def __init__(self, criterion='entropy', min_samples_split=2, max_depth=5):
        
        self.d_tree = DecisionTree(criterion=criterion, 
                                   min_samples_split=min_samples_split, 
                                   max_depth=max_depth)
    def fit(self, X, y):
        self.d_tree.fit(X, y)
    
    def predict(self, X):
        preds = self.d_tree.predict(X)
        return np.round(preds)
    
class DecisionTreeRegressor:
    
    def __init__(self, criterion='var', min_samples_split=2, max_depth=5):
        self.d_tree = DecisionTree(criterion=criterion, 
                                   min_samples_split=min_samples_split, 
                                   max_depth=max_depth)
    def fit(self, X, y):
        self.d_tree.fit(X, y)
    
    def predict(self, X):
        return self.d_tree.predict(X)
    
    
class RandomForestClassifier:
    
    def __init__(self, n_estimators=10, criterion='entropy', 
                 min_samples_split=2, max_depth=5):

        self.estimators = [DecisionTreeClassifier(criterion=criterion, 
                                             min_samples_split=min_samples_split, 
                                             max_depth=max_depth) for _ in range(n_estimators)]
    
    def fit(self, X, y):
        
        self.sample_size = int(0.5 * len(y))
        for estimator in self.estimators:
            rand_samples = np.random.permutation(len(y))[:self.sample_size]
            estimator.fit(X[rand_samples], y[rand_samples])
    
    def predict(self, X):
        preds = np.array([estimator.predict(X) for estimator in self.estimators])
        return np.round(np.mean(preds, axis=0))
    
class RandomForestRegressor:
    
    def __init__(self, n_estimators=10, criterion='var', 
                 min_samples_split=2, max_depth=5):
        
        self.estimators = [DecisionTreeRegressor(criterion=criterion, 
                                             min_samples_split=min_samples_split, 
                                             max_depth=max_depth) for _ in range(n_estimators)]
    
    def fit(self, X, y):
        for estimator in self.estimators:
            estimator.fit(X, y)
    
    def predict(self, X):
        preds = np.array([estimator.predict(X) for estimator in self.estimators])
        return np.mean(preds, axis=0)


### RandomForestClassifier

In [5]:
rf = RandomForestClassifier(n_estimators=3, criterion='entropy', 
                            min_samples_split=5, max_depth=5)
rf.fit(X.values, y.values)
preds = rf.predict(X.values)
print((preds == y).mean())

0.9666666666666667


In [6]:
from sklearn.ensemble import RandomForestClassifier

rf = RandomForestClassifier(n_estimators=3, criterion='entropy', min_samples_split=5, max_depth=5)
rf.fit(X, y)
preds = rf.predict(X)
print((preds == y).mean())

0.9666666666666667


### RandomForestRegressor

In [7]:
filename = "data/ex1data1.txt"
data = np.loadtxt(filename, delimiter=',', usecols=(0, 1), unpack=True)

X = np.transpose(np.array(data[:-1]))
y = np.transpose(np.array(data[-1:]))

In [8]:
rfr = RandomForestRegressor(n_estimators=3, criterion='gini')

rfr.fit(X, y)
preds = rfr.predict(X)
print("MSE: ", np.mean((preds - y) ** 2))

MSE:  31.49511896402476


In [9]:
# Sklearn way
from sklearn.ensemble import RandomForestRegressor

rfr = RandomForestRegressor(n_estimators=3, criterion='mse')
rfr.fit(X, y)
preds = rfr.predict(X)

print("MSE: ", np.mean((preds - y) ** 2))

MSE:  56.7337121522493


  """
