In [2]:
## imports ##
import numpy as np
import random

Parameters:  
n_estimators: (int) The number of trees in the forest.  
max_features: (int) The number of features to consider when looking for the best split   
max_depth: (int) The maximum depth of the tree  
min_samples_split: (int) The minimum number of samples required to split an internal node  

In [71]:
class RandomForest:
    def __init__(self, n_estimators=None, max_features=None, max_depth=None, min_samples_split=None):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def entropy(self, p):
        if p == 0:
            return 0
        elif p == 1:
            return 0
        else:
            return - (p * np.log2(p) + (1 - p) * np.log2(1-p))
    
    def information_gain(self, left_child, right_child):
        parent = left_child + right_child
        p_parent = parent.count(1) / len(parent) if len(parent) > 0 else 0
        p_left = left_child.count(1) / len(left_child) if len(left_child) > 0 else 0
        p_right = right_child.count(1) / len(right_child) if len(right_child) > 0 else 0
        IG_p = self.entropy(p_parent)
        IG_l = self.entropy(p_left)
        IG_r = self.entropy(p_right)
        return IG_p - len(left_child) / len(parent) * IG_l - len(right_child) / len(parent) * IG_r

    
    def draw_bootstrap(self, X_train, y_train):
        bootstrap_indices = list(np.random.choice(range(len(X_train)), len(X_train), replace = True))
        oob_indices = [i for i in range(len(X_train)) if i not in bootstrap_indices]
        X_bootstrap = X_train.iloc[bootstrap_indices].values
        y_bootstrap = y_train[bootstrap_indices]
        X_oob = X_train.iloc[oob_indices].values
        y_oob = y_train[oob_indices]
        return X_bootstrap, y_bootstrap, X_oob, y_oob

    def oob_score(self, tree, X_test, y_test):
        mis_label = 0
        for i in range(len(X_test)):
            pred = self.predict_tree(tree, X_test[i])
            if pred != y_test[i]:
                mis_label += 1
        return mis_label / len(X_test)
    
    def find_split_point(self, X_bootstrap, y_bootstrap, max_features):
        feature_ls = list()
        num_features = len(X_bootstrap[0])
        
        while len(feature_ls) <= max_features:
            feature_idx = random.sample(range(num_features), 1)
            if feature_idx not in feature_ls:
                feature_ls.extend(feature_idx)
        
        best_info_gain = -999
        node = None
        for feature_idx in feature_ls:
            for split_point in X_bootstrap[:,feature_idx]:
                left_child = {'X_bootstrap': [], 'y_bootstrap': []}
                right_child = {'X_bootstrap': [], 'y_bootstrap': []}
                
                # split children for continuous variables
                if type(split_point) in [int, float]:
                    split_point = float(split_point)
                    for i, value in enumerate(X_bootstrap[:,feature_idx]):
                        #print(value, split_point)
                        try:
                            if value <= split_point:
                                left_child['X_bootstrap'].append(X_bootstrap[i])
                                left_child['y_bootstrap'].append(y_bootstrap[i])
                            else:
                                right_child['X_bootstrap'].append(X_bootstrap[i])
                                right_child['y_bootstrap'].append(y_bootstrap[i])
                        except(TypeError):
                            print("Error occurred with split_point type:", split_point, type(split_point))
                            print("Error occurred with value type:", value, type(value))
                            print("Error occurred in column:", X_bootstrap[:, feature_idx])
                            raise  # Re-raise the exception to halt the program
                # split children for categoric variables
                else:
                    for i, value in enumerate(X_bootstrap[:,feature_idx]):
                        if value == split_point:
                            left_child['X_bootstrap'].append(X_bootstrap[i])
                            left_child['y_bootstrap'].append(y_bootstrap[i])
                        else:
                            right_child['X_bootstrap'].append(X_bootstrap[i])
                            right_child['y_bootstrap'].append(y_bootstrap[i])
                
                split_info_gain = self.information_gain(left_child['y_bootstrap'], right_child['y_bootstrap'])
                if split_info_gain > best_info_gain:
                    best_info_gain = split_info_gain
                    left_child['X_bootstrap'] = np.array(left_child['X_bootstrap'])
                    right_child['X_bootstrap'] = np.array(right_child['X_bootstrap'])
                    node = {'information_gain': split_info_gain, 
                            'left_child': left_child, 
                            'right_child': right_child, 
                            'split_point': split_point,
                            'feature_idx': feature_idx}
                    
        
        return node
        
    def terminal_node(self, node):
        y_bootstrap = node['y_bootstrap']
        pred = max(y_bootstrap, key = y_bootstrap.count)
        return pred

    def split_node(self, node, max_features, min_samples_split, max_depth, depth):
        left_child = node['left_child']
        right_child = node['right_child']    

        del(node['left_child'])
        del(node['right_child'])
        
        if len(left_child['y_bootstrap']) == 0 or len(right_child['y_bootstrap']) == 0:
            empty_child = {'y_bootstrap': left_child['y_bootstrap'] + right_child['y_bootstrap']}
            node['left_split'] = self.terminal_node(empty_child)
            node['right_split'] = self.terminal_node(empty_child)
            return
        
        if depth >= max_depth:
            node['left_split'] = self.terminal_node(left_child)
            node['right_split'] = self.terminal_node(right_child)
            return node
        
        if len(left_child['X_bootstrap']) <= min_samples_split:
            node['left_split'] = node['right_split'] = self.terminal_node(left_child)
        else:
            node['left_split'] = self.find_split_point(left_child['X_bootstrap'], left_child['y_bootstrap'], max_features)
            self.split_node(node['left_split'], max_depth, min_samples_split, max_depth, depth + 1)
        if len(right_child['X_bootstrap']) <= min_samples_split:
            node['right_split'] = node['left_split'] = self.terminal_node(right_child)
        else:
            node['right_split'] = self.find_split_point(right_child['X_bootstrap'], right_child['y_bootstrap'], max_features)
            self.split_node(node['right_split'], max_features, min_samples_split, max_depth, depth + 1)


    def build_tree(self, X_bootstrap, y_bootstrap, max_depth, min_samples_split, max_features):
        root_node = self.find_split_point(X_bootstrap, y_bootstrap, max_features)
        self.split_node(root_node, max_features, min_samples_split, max_depth, 1)
        return root_node

    def fit(self, X_train, y_train, n_estimators, max_features, max_depth, min_samples_split):
        tree_ls = list()
        oob_ls = list()
        for i in range(n_estimators):
            X_bootstrap, y_bootstrap, X_oob, y_oob = self.draw_bootstrap(X_train, y_train)
            tree = self.build_tree(X_bootstrap, y_bootstrap, max_features, max_depth, min_samples_split)
            tree_ls.append(tree)
            oob_error = self.oob_score(tree, X_oob, y_oob)
            oob_ls.append(oob_error)
        print("OOB estimate: {:.2f}".format(np.mean(oob_ls)))
        return tree_ls 

    def predict_tree(self, tree, X_test):
        feature_idx = tree['feature_idx']
        
        if X_test[feature_idx] <= tree['split_point']:
            if type(tree['left_split']) == dict:
                return self.predict_tree(tree['left_split'], X_test)
            else:
                value = tree['left_split']
                return value
        else:
            if type(tree['right_split']) == dict:
                return self.predict_tree(tree['right_split'], X_test)
            else:
                return tree['right_split']       

    def predict_rf(self, tree_ls, X_test):
        pred_ls = list()
        for i in range(len(X_test)):
            ensemble_preds = [self.predict_tree(tree, X_test.values[i]) for tree in tree_ls]
            final_pred = max(ensemble_preds, key = ensemble_preds.count)
            pred_ls.append(final_pred)
        return np.array(pred_ls)
        

### Testing the model

In [90]:
import random
import pandas as pd
import numpy as np

df = pd.read_csv(r'titatnic_train.csv')
df.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


Random Forests are not robust enough to handle missing values, therefore it is necessary to handle null values.

In [93]:
import random
import pandas as pd
import numpy as np

df = pd.read_csv(r'titatnic_train.csv')
df.head()
df.drop(columns="Cabin",inplace=True)
df.drop(columns="Embarked",inplace=True)
features = ['Pclass','Sex','Age','SibSp','Parch', 'Fare']
nb_train = int(np.floor(0.9 * len(df)))
df = df.sample(frac=1, random_state=217)
X_train = df[features][:nb_train]
y_train = df['Survived'][:nb_train].values
X_test = df[features][nb_train:]
y_test = df['Survived'][nb_train:].values
df.loc[df['Age'].isnull(),'Age'] = np.round(df['Age'].mean())
# Replace null values with mean of their corresponding attribute (only for numerical columns)
for column in df.columns:
    if df[column].dtype in ['int64', 'float64']:
        mean_value = df[column].mean()
        df[column].fillna(mean_value, inplace=True)

n_estimators = 100
max_features = 3
max_depth = 10
min_samples_split = 2

rf = RandomForest(n_estimators, max_features, max_depth, min_samples_split)
model = rf.fit(X_train, y_train, n_estimators, max_features, max_depth, min_samples_split)

OOB estimate: 0.31


In [94]:
preds = rf.predict_rf(model, X_test)

In [96]:
acc = sum(preds == y_test) / len(y_test)
print("Testing accuracy: {}".format(np.round(acc,3)))

Testing accuracy: 0.589


### Compare with scikit-learn

In [89]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Load the dataset
df = pd.read_csv('titatnic_train.csv')

# Data preprocessing
df.drop(columns=["Cabin", "Embarked"], inplace=True)
df.loc[df['Age'].isnull(), 'Age'] = np.round(df['Age'].mean())
for column in df.columns:
    if df[column].dtype in ['int64', 'float64']:
        mean_value = df[column].mean()
        df[column].fillna(mean_value, inplace=True)

# Convert categorical variables to one-hot encoding
df = pd.get_dummies(df, columns=['Sex'])

# Define features and split data into train and test sets
features = ['Pclass', 'Sex_female', 'Sex_male', 'Age', 'SibSp', 'Parch', 'Fare']
nb_train = int(np.floor(0.9 * len(df)))
df = df.sample(frac=1, random_state=217)
X_train = df[features][:nb_train]
y_train = df['Survived'][:nb_train]
X_test = df[features][nb_train:]
y_test = df['Survived'][nb_train:]

# Create and train the Random Forest model
n_estimators = 100
max_features = 3
max_depth = 10
min_samples_split = 2

rf_model = RandomForestClassifier(n_estimators=n_estimators,
                                   max_features=max_features,
                                   max_depth=max_depth,
                                   min_samples_split=min_samples_split,
                                   random_state=42)
rf_model.fit(X_train, y_train)

# Make predictions
preds = rf_model.predict(X_test)

# Calculate accuracy
acc = accuracy_score(y_test, preds)
print("Testing accuracy: {}".format(np.round(acc, 3)))


Testing accuracy: 0.856


Since our random forest classifier is just a learning project where in we are trying to mimick the functionalities of scikit learn's random forest classifier you can see a difference in accuracies.