# ML Report Rose Laird

#### Setup

##### loading data

In [2]:
import torch
import os
import scipy.io as sio
from sklearn.model_selection import train_test_split
import numpy as np

data_dir_root = os.path.join('ThingsEEG-Text')
sbj = 'sub-10'
image_model = 'pytorch/cornet_s'
text_model = 'CLIPText'
roi = '17channels'
brain_dir = os.path.join(data_dir_root, 'brain_feature', roi, sbj)
image_dir_seen = os.path.join(data_dir_root, 'visual_feature/ThingsTrain', image_model, sbj)
image_dir_unseen = os.path.join(data_dir_root, 'visual_feature/ThingsTest', image_model, sbj)
text_dir_seen = os.path.join(data_dir_root, 'textual_feature/ThingsTrain/text', text_model, sbj)
text_dir_unseen = os.path.join(data_dir_root, 'textual_feature/ThingsTest/text', text_model, sbj)

brain_seen = sio.loadmat(os.path.join(brain_dir, 'eeg_train_data_within.mat'))['data'].astype('double') * 2.0
brain_seen = brain_seen[:,:,27:60] # 70ms-400ms
brain_seen = np.reshape(brain_seen, (brain_seen.shape[0], -1))
image_seen = sio.loadmat(os.path.join(image_dir_seen, 'feat_pca_train.mat'))['data'].astype('double')*50.0
text_seen = sio.loadmat(os.path.join(text_dir_seen, 'text_feat_train.mat'))['data'].astype('double')*2.0
label_seen = sio.loadmat(os.path.join(brain_dir, 'eeg_train_data_within.mat'))['class_idx'].T.astype('int')
image_seen = image_seen[:,0:100]

brain_unseen = sio.loadmat(os.path.join(brain_dir, 'eeg_test_data.mat'))['data'].astype('double')*2.0
brain_unseen = brain_unseen[:, :, 27:60]
brain_unseen = np.reshape(brain_unseen, (brain_unseen.shape[0], -1))
image_unseen = sio.loadmat(os.path.join(image_dir_unseen, 'feat_pca_test.mat'))['data'].astype('double')*50.0
text_unseen = sio.loadmat(os.path.join(text_dir_unseen, 'text_feat_test.mat'))['data'].astype('double')*2.0
label_unseen = sio.loadmat(os.path.join(brain_dir, 'eeg_test_data.mat'))['class_idx'].T.astype('int')
image_unseen = image_unseen[:, 0:100]

brain_seen = torch.from_numpy(brain_seen)
brain_unseen = torch.from_numpy(brain_unseen)
image_seen = torch.from_numpy(image_seen)
image_unseen = torch.from_numpy(image_unseen)
text_seen = torch.from_numpy(text_seen)
text_unseen = torch.from_numpy(text_unseen)
label_seen = torch.from_numpy(label_seen)
label_unseen = torch.from_numpy(label_unseen)

Data splitting

In [3]:
import numpy as np

index_seen = np.squeeze(np.where(label_seen < 21, True, False))
index_unseen = np.squeeze(np.where(label_unseen < 21, True, False))

brain_seen = brain_seen[index_seen, :]
image_seen = image_seen[index_seen, :]
text_seen = text_seen[index_seen, :]
label_seen = label_seen[index_seen]
brain_unseen = brain_unseen[index_unseen, :]
image_unseen = image_unseen[index_unseen, :]
text_unseen = text_unseen[index_unseen, :]
label_unseen = label_unseen[index_unseen]

num_classes = 20
samples_per_class = 10

new_train_brain = []
new_train_image = []
new_train_text = []
new_train_label = []

new_test_brain = []
new_test_image = []
new_test_text = []
new_test_label = []
count = 0


for i in range(num_classes):
    start_idx = i * samples_per_class#The starting index of the current class
    end_idx = start_idx + samples_per_class#The end index of the current class
    #Get the data of the current class
    class_data_brain = brain_seen[start_idx:end_idx, :]
    #Divided into training set and test set
    new_train_brain.append(class_data_brain[:7])
    new_test_brain.append(class_data_brain[7:])

    class_data_image = image_seen[start_idx:end_idx, :]

    new_train_image.append(class_data_image[:7])
    new_test_image.append(class_data_image[7:])

    class_data_text = text_seen[start_idx:end_idx, :]

    new_train_text.append(class_data_text[:7])
    new_test_text.append(class_data_text[7:])

    class_data_label = label_seen[start_idx:end_idx, :]

    new_train_label.append(class_data_label[:7])
    new_test_label.append(class_data_label[7:])
  

train_brain = torch.vstack(new_train_brain)
train_image = torch.vstack(new_train_image)
train_text = torch.vstack(new_train_text)
train_label = torch.vstack(new_train_label)
test_brain = torch.vstack(new_test_brain)
test_image = torch.vstack(new_test_image)
test_text = torch.vstack(new_test_text)
test_label = torch.vstack(new_test_label)


##### Making Features

In [4]:
train_brain_np = train_brain.numpy()
train_image_np = train_image.numpy()
train_text_np = train_text.numpy()
train_label_np = train_label.numpy().ravel()

test_brain_np = test_brain.numpy()
test_image_np = test_image.numpy()
test_text_np = test_text.numpy()
test_label_np = test_label.numpy().ravel()

train_features_multiple = np.hstack((train_brain_np, train_image_np, train_text_np))
test_features_multiple = np.hstack((test_brain_np, test_image_np, test_text_np))

## Data Exploration

#### Statistical Exploration 

Shapes and Distributions

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

print("Feature Modality Breakdown:")
print(f"Brain Features: {train_brain.shape[1]}")
print(f"Image Features: {train_image.shape[1]}")
print(f"Text Features: {train_text.shape[1]}")

train_samples = len(train_features_multiple)
test_samples = len(test_features_multiple)
classes = train_label.unique()

print(f"Training Samples: {train_samples}")
print(f"Testing Samples: {test_samples}")
print(f"Classes: {classes}")

train_class_dist = pd.Series(train_label_np).value_counts()
test_class_dist = pd.Series(test_label_np).value_counts()

print("\nTraining Class Distribution:")
print(train_class_dist)

print("\nTesting Class Distribution:")
print(test_class_dist)



Mean, Standard Deviation and Range Analysis

In [None]:
import torch
import pandas as pd

def calculate_tensor_statistics(tensor, tensor_name):
    mean = tensor.mean(dim=0)
    std = tensor.std(dim=0)
    feature_range = tensor.max(dim=0).values - tensor.min(dim=0).values
    
    stats_df = pd.DataFrame({
        "Feature": range(mean.shape[0]),
        "Mean": mean.numpy(),
        "Standard Deviation": std.numpy(),
        "Range": feature_range.numpy()
    })
    return stats_df

train_brain_stats = calculate_tensor_statistics(train_brain, "train_brain")
train_text_stats = calculate_tensor_statistics(train_text, "train_text")
trainimage_stats = calculate_tensor_statistics(train_image, "train_image")

aggregate_stats = pd.DataFrame({
    "Dataset": ["train_brain", "train_text", "trainimage"],
    "Mean of Means": [
        train_brain_stats["Mean"].mean(),
        train_text_stats["Mean"].mean(),
        trainimage_stats["Mean"].mean()
    ],
    "Mean of Standard Deviations": [
        train_brain_stats["Standard Deviation"].mean(),
        train_text_stats["Standard Deviation"].mean(),
        trainimage_stats["Standard Deviation"].mean()
    ],
    "Mean of Ranges": [
        train_brain_stats["Range"].mean(),
        train_text_stats["Range"].mean(),
        trainimage_stats["Range"].mean()
    ]
})

train_brain_stats.to_csv("train_brain_stats.csv", index=False)
train_text_stats.to_csv("train_text_stats.csv", index=False)
trainimage_stats.to_csv("trainimage_stats.csv", index=False)
aggregate_stats.to_csv("aggregate_stats.csv", index=False)

print("Train Brain Statistics:\n", train_brain_stats.head())
print("Train Text Statistics:\n", train_text_stats.head())
print("Train Image Statistics:\n", trainimage_stats.head())
print("Aggregate Statistics:\n", aggregate_stats)

#### Visual

Class Distribution

In [None]:
train_class_dist.plot(kind='bar', title='Training Class Distribution', color='skyblue')
plt.xlabel('Class')
plt.ylabel('Frequency')
plt.show()

test_class_dist.plot(kind='bar', title='Testing Class Distribution', color='orange')
plt.xlabel('Class')
plt.ylabel('Frequency')
plt.show()


Correlation Heatmaps

In [None]:
import seaborn as sns

import pandas as pd

train_brain_pd = pd.DataFrame(train_brain)
train_image_pd = pd.DataFrame(train_image)
train_text_pd = pd.DataFrame(train_text)


plt.figure(figsize=(12, 10))
sns.heatmap(train_brain_pd.corr(), cmap='coolwarm', annot=False)
plt.gca().invert_yaxis()
plt.title("Correlation Heatmap: Brain Features")
plt.show()

sns.heatmap(train_image_pd.corr(), cmap='coolwarm', annot=False)
plt.gca().invert_yaxis()
plt.title("Correlation Heatmap: Image Features")
plt.show()

sns.heatmap(train_text_pd.corr(), cmap='coolwarm', annot=False)
plt.gca().invert_yaxis()
plt.title("Correlation Heatmap: Text Features")
plt.show()


Outliers -PERCHANCE NOT

In [None]:
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest

iso = IsolationForest(contamination=0.05, random_state=42)
outlier_mask = iso.fit_predict(train_features_multiple) == 1 

X_train_filtered = train_features_multiple[outlier_mask]
y_train_filtered = train_label_np[outlier_mask]

scaler = StandardScaler()
X_unfiltered_scaled = scaler.fit_transform(train_features_multiple)
X_filtered_scaled = scaler.transform(X_train_filtered)

pca = PCA(n_components=2)
X_unfiltered_pca = pca.fit_transform(X_unfiltered_scaled) 
X_filtered_pca = pca.fit_transform(X_filtered_scaled)

fig, axes = plt.subplots(1, 2, figsize=(14, 6), sharey=True)

axes[0].scatter(
    X_unfiltered_pca[:, 0],
    X_unfiltered_pca[:, 1],
    c=train_label_np, 
    cmap='viridis',
    alpha=0.6
)
axes[0].set_title("Before Outlier Removal")
axes[0].set_xlabel("PCA Component 1")
axes[0].set_ylabel("PCA Component 2")

axes[1].scatter(
    X_filtered_pca[:, 0],
    X_filtered_pca[:, 1],
    c=y_train_filtered,
    cmap='viridis',
    alpha=0.6
)
axes[1].set_title("After Outlier Removal")
axes[1].set_xlabel("PCA Component 1")

plt.tight_layout()
plt.show()



##### PCA

In [None]:
from sklearn.decomposition import PCA

pca = PCA().fit(train_features_multiple)
explained_variance = np.cumsum(pca.explained_variance_ratio_)

plt.figure(figsize=(8, 5))
plt.plot(range(1, len(explained_variance) + 1), explained_variance, marker='o')
plt.title("Explained Variance of Combined Features (Three Modalities)")
plt.xlabel("Number of Principal Components")
plt.ylabel("Cumulative Explained Variance")
plt.axhline(y=0.95, color='r', linestyle='--', label='95% Variance Threshold')
plt.axhline(y=0.99, color='g', linestyle='--', label='99% Variance Threshold')
plt.legend()
plt.show()


## Custom Random Forest Tree Model

#### Model

*** Reference: I learnt how to code a basic Random Forest Tree model by following along this video: https://www.youtube.com/watch?v=kFwe2ZZU7yw and therefore the cell below is very similar to the code in the video. All cells and changes before and after are my own.

In [8]:
import numpy as np
from collections import Counter

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None,*,value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        
    def is_leaf_node(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
        self.min_samples_split=min_samples_split
        self.max_depth=max_depth
        self.n_features=n_features
        self.root=None

    def fit(self, X, y):
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1],self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        if (depth>=self.max_depth or n_labels==1 or n_samples<self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)

        best_feature, best_thresh = self._best_split(X, y, feat_idxs)

        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth+1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth+1)
        return Node(best_feature, best_thresh, left, right)


    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold


    def _information_gain(self, y, X_column, threshold):
        parent_entropy = self._entropy(y)

        left_idxs, right_idxs = self._split(X_column, threshold)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0
        
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l/n) * e_l + (n_r/n) * e_r

        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log(p) for p in ps if p>0])


    def _most_common_label(self, y):
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)
        


import numpy as np
from collections import Counter

class RandomForest:
    def __init__(self, n_trees=10, max_depth=10, min_samples_split=2, n_feature=None):
        self.n_trees = n_trees
        self.max_depth=max_depth
        self.min_samples_split=min_samples_split
        self.n_features=n_feature
        self.trees = []

    def fit(self, X, y):
        self.trees = []
        for _ in range(self.n_trees):
            tree = DecisionTree(max_depth=self.max_depth,
                            min_samples_split=self.min_samples_split,
                            n_features=self.n_features)
            X_sample, y_sample = self._bootstrap_samples(X, y)
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def _bootstrap_samples(self, X, y):
        n_samples = X.shape[0]
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        return X[idxs], y[idxs]

    def _most_common_label(self, y):
        counter = Counter(y)
        most_common = counter.most_common(1)[0][0]
        return most_common

    def predict(self, X):
        predictions = np.array([tree.predict(X) for tree in self.trees])
        tree_preds = np.swapaxes(predictions, 0, 1)
        predictions = np.array([self._most_common_label(pred) for pred in tree_preds])
        return predictions

#### Run

In [None]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
import numpy as np

from sklearn.decomposition import PCA

pca = PCA(n_components=0.95)
X_train_reduced = pca.fit_transform(train_features_multiple)
X_test_reduced = pca.transform(test_features_multiple)


def accuracy(y_true, y_pred):
    accuracy = np.sum(y_true == y_pred) / len(y_true)
    return accuracy

clf = RandomForest(n_trees=30)
clf.fit(X_train_reduced, train_label_np)
predictions = clf.predict(X_test_reduced)

acc =  accuracy(test_label_np, predictions)
print(acc)

#### Hyperparamter Script for Custom

In [None]:
import numpy as np
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import accuracy_score, classification_report
import pandas as pd
from sklearn.decomposition import PCA


param_grid = {
    'n_trees': [10, 20, 50],  
    'max_depth': [None, 10, 20], 
    'min_samples_split': [2, 5, 10] 
}

results = []

for params in ParameterGrid(param_grid):
    print(f"Training with params: {params}")
    rf = RandomForest(
        n_trees=params['n_trees'],
        max_depth=params['max_depth'] if params['max_depth'] is not None else float('inf'),
        min_samples_split=params['min_samples_split']
    )
    rf.fit(X_train_reduced, train_label_np.flatten())
    
    y_pred = rf.predict(X_test_reduced)
    accuracy = accuracy_score(test_label_np.flatten(), y_pred)
    
    results.append({
        'n_trees': params['n_trees'],
        'max_depth': params['max_depth'],
        'min_samples_split': params['min_samples_split'],
        'accuracy': accuracy
    })

results_df = pd.DataFrame(results)

best_result = results_df.loc[results_df['accuracy'].idxmax()]
print("\nBest Hyperparameters:")
print(best_result)
print("\nClassification Report for Best Model:")
best_rf = RandomForest(
    n_trees=int(best_result['n_trees']),
    max_depth=int(best_result['max_depth']) if best_result['max_depth'] != 'None' else float('inf'),
    min_samples_split=int(best_result['min_samples_split'])
)
best_rf.fit(X_train_reduced, train_label_np)
y_pred_best = best_rf.predict(X_test_reduced)
print(classification_report(test_label_np, y_pred_best))


#### Convergence Speed

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.decomposition import PCA


n_trees_range = [10, 20, 50, 100]
max_depth = 10
min_samples_split = 2
n_features = None 

accuracy_scores = []

for n_trees in n_trees_range:
    custom_rf = RandomForest(n_trees=n_trees, max_depth=max_depth,
                             min_samples_split=min_samples_split, n_feature=n_features)
    custom_rf.fit(X_train_reduced, train_label_np)
    
    predictions = custom_rf.predict(X_test_reduced)
    
    accuracy = accuracy_score(test_label_np, predictions)
    accuracy_scores.append(accuracy)

plt.figure(figsize=(8, 6))
plt.plot(n_trees_range, accuracy_scores, marker='o')
plt.title('Convergence Speed of Custom Random Forest')
plt.xlabel('Number of Trees')
plt.ylabel('Accuracy')
plt.grid()
plt.show()


### Comparison to Baseline

Accuracy

In [None]:
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(train_features_multiple, train_label_np)
predictions = model.predict(test_features_multiple)

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

accuracy = accuracy_score(test_label_np, predictions)
print("Accuracy:", accuracy)

print("Classification Report:")
print(classification_report(test_label_np, predictions))

print("Confusion Matrix:")
print(confusion_matrix(test_label_np, predictions))

print("Predictions:", predictions)

Convergence Speed

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

n_estimators_range = [10, 20, 50, 100, 200, 300]

accuracies = []

for n in n_estimators_range:
    model = RandomForestClassifier(n_estimators=n, random_state=42)
    model.fit(train_features_multiple, train_label_np)
    predictions = model.predict(test_features_multiple)
    
    acc = accuracy_score(test_label_np, predictions) 
    accuracies.append(acc)

plt.figure(figsize=(8, 5))
plt.plot(n_estimators_range, accuracies, marker='o', linestyle='-')
plt.title('Convergence Speed of Random Forest')
plt.xlabel('Number of Trees (n_estimators)')
plt.ylabel('Accuracy')
plt.grid(True)
plt.show()


### Improved Custom RF

In [36]:
from joblib import Parallel, delayed
import numpy as np
from collections import Counter

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None,*,value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        
    def is_leaf_node(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
        self.min_samples_split=min_samples_split
        self.max_depth=max_depth
        self.n_features=n_features
        self.root=None

    def fit(self, X, y):
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1],self.n_features)
        self.root = self._grow_tree(X, y)
        


    def _grow_tree(self, X, y, depth=0):
        n_samples, n_labels = len(y), len(np.unique(y))

        if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split:
            return Node(value=self._most_common_label(y))

        feat_idxs = np.random.choice(X.shape[1], self.n_features, replace=False)
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)

        if best_feature is None:
            return Node(value=self._most_common_label(y))

        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return Node(value=self._most_common_label(y))

        left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)
    



    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold
    



    def _information_gain(self, y, X_column, threshold):
        parent_entropy = self._entropy(y)

        left_idxs, right_idxs = self._split(X_column, threshold)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0
        
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l/n) * e_l + (n_r/n) * e_r

        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y) 
        ps = hist / len(y)
        return -np.sum(ps * np.log(ps + 1e-10))



    def _most_common_label(self, y):
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)
        

class RandomForestImproved:
    def __init__(self, n_trees=10, max_depth=10, min_samples_split=2, n_features=None):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.n_features = n_features
        self.trees = []
        self.tree_weights = []

    def fit(self, X, y):
        results = Parallel(n_jobs=-1)(
            delayed(self._train_tree_with_weight)(X, y) for _ in range(self.n_trees)
        )
        self.trees, self.tree_weights = zip(*results)

    def _train_tree_with_weight(self, X, y):
        tree = DecisionTree(
            max_depth=self.max_depth,
            min_samples_split=self.min_samples_split,
            n_features=self.n_features
        )
        X_sample, y_sample, oob_idxs = self._bootstrap_samples(X, y)
        tree.fit(X_sample, y_sample)

        if len(oob_idxs) > 0:
            oob_pred = tree.predict(X[oob_idxs])
            oob_acc = np.mean(oob_pred == y[oob_idxs])
        else:
            oob_acc = 1.0

        return tree, oob_acc

    def _bootstrap_samples(self, X, y):
        n_samples = X.shape[0]
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        oob_idxs = list(set(range(n_samples)) - set(idxs))
        return X[idxs], y[idxs], oob_idxs

    def predict(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        assert tree_preds.ndim == 2, f"tree_preds shape is {tree_preds.shape}, expected 2D"
        
        assert len(self.tree_weights) == tree_preds.shape[0], (
            f"Tree weights length ({len(self.tree_weights)}) does not match number of trees ({tree_preds.shape[0]})"
        )
        
        n_samples = tree_preds.shape[1]
        vote_tally = np.zeros((n_samples, 20)) 
        
        for tree_idx, tree_weight in enumerate(self.tree_weights):
            for sample_idx, pred in enumerate(tree_preds[tree_idx]):
                vote_tally[sample_idx, pred - 1] += tree_weight
        
        predictions = np.argmax(vote_tally, axis=1)
        return predictions + 1
    

Run

In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.metrics import accuracy_score

iso = IsolationForest(contamination=0.05, random_state=42)
outlier_mask = iso.fit_predict(train_features_multiple) == 1

X_train_filtered = train_features_multiple[outlier_mask]
y_train_filtered = train_label_np[outlier_mask]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_filtered)
X_test_scaled = scaler.transform(test_features_multiple)

pca = PCA(n_components=0.95)
X_train_reduced = pca.fit_transform(X_train_scaled)
X_test_reduced = pca.transform(X_test_scaled)

clf = RandomForestImproved(n_trees=150, max_depth=35, min_samples_split=2)
clf.fit(X_train_reduced, y_train_filtered)

predictions = clf.predict(X_test_reduced)
accuracy = accuracy_score(test_label_np, predictions)

print(f"Custom Accuracy: {accuracy:.4f}")


##### New Hyperparameters

In [None]:
import numpy as np
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import accuracy_score, classification_report
import pandas as pd

y_train = train_label_np.flatten()
y_test = test_label_np.flatten()
pca = PCA(n_components=0.95)
X_train = pca.fit_transform(train_features_multiple)
X_test = pca.transform(test_features_multiple)

param_grid = {
    'n_trees': [50, 80, 100],
    'max_depth': [20, 35],
    'min_samples_split': [2, 5, 10]
}


results = []

for params in ParameterGrid(param_grid):
    print(f"Training with params: {params}")
    rf = RandomForestImproved(
        n_trees=params['n_trees'],
        max_depth=params['max_depth'] if params['max_depth'] is not None else float('inf'),
        min_samples_split=params['min_samples_split']
    )
    rf.fit(X_train, y_train)
    
    y_pred = rf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    results.append({
        'n_trees': params['n_trees'],
        'max_depth': params['max_depth'],
        'min_samples_split': params['min_samples_split'],
        'accuracy': accuracy
    })
    
results_df = pd.DataFrame(results)


best_result = results_df.loc[results_df['accuracy'].idxmax()]
print("\nBest Hyperparameters:")
print(best_result)
print("\nClassification Report for Best Model:")
best_rf = RandomForestImproved(
    n_trees=int(best_result['n_trees']),
    max_depth=int(best_result['max_depth']) if best_result['max_depth'] != 'None' else float('inf'),
    min_samples_split=int(best_result['min_samples_split'])
)
best_rf.fit(X_train, y_train)
y_pred_best = best_rf.predict(X_test)
print(classification_report(y_test, y_pred_best))


## Result Analysis

Improved Custom Model

In [None]:
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    confusion_matrix, roc_curve, auc, precision_recall_curve,
)
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest


iso = IsolationForest(contamination=0.05, random_state=42)
outlier_mask = iso.fit_predict(train_features_multiple) == 1

X_train_filtered = train_features_multiple[outlier_mask]
y_train_filtered = train_label_np[outlier_mask]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_filtered)
X_test_scaled = scaler.transform(test_features_multiple)

pca = PCA(n_components=0.95)
X_train_reduced = pca.fit_transform(X_train_scaled)
X_test_reduced = pca.transform(X_test_scaled)

clf = RandomForestImproved(n_trees=150, max_depth=35, min_samples_split=2)
clf.fit(X_train_reduced, y_train_filtered)


y_prob = clf.predict_proba(X_test_reduced) 
y_pred = np.argmax(y_prob, axis=1)
y_pred = y_pred + 1 
y_true = test_label_np

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision (Macro): {precision:.2f}")
print(f"Recall (Macro): {recall:.2f}")
print(f"F1 Score (Macro): {f1:.2f}")

conf_matrix = confusion_matrix(y_true, y_pred)
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.gca().invert_yaxis()
tick_marks = np.arange(conf_matrix.shape[0]) + 1
plt.xticks(tick_marks - 1, tick_marks)
plt.yticks(tick_marks - 1, tick_marks)
plt.title("Confusion Matrix of Custom Improved Model")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()




##### Stats for SK Learn RF Baseline

In [None]:
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    confusion_matrix, roc_curve, auc, precision_recall_curve,
)
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier

clf = RandomForestClassifier()
clf.fit(X_train_reduced, y_train_filtered)


y_prob = clf.predict_proba(X_test_reduced)
y_pred = np.argmax(y_prob, axis=1)
y_pred = y_pred + 1
y_true = test_label_np

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision (Macro): {precision:.2f}")
print(f"Recall (Macro): {recall:.2f}")
print(f"F1 Score (Macro): {f1:.2f}")


conf_matrix = confusion_matrix(y_true, y_pred)
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.gca().invert_yaxis()
tick_marks = np.arange(conf_matrix.shape[0]) + 1
plt.xticks(tick_marks - 1, tick_marks)
plt.yticks(tick_marks - 1, tick_marks)
plt.title("Confusion Matrix of SK Learn Random Forest")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()




SK Learn Logistic Regression Baseline

In [None]:
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    confusion_matrix, roc_curve, auc, precision_recall_curve,
)
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression


clf = LogisticRegression()
clf.fit(X_train_reduced, y_train_filtered)


y_prob = clf.predict_proba(X_test_reduced)
y_pred = np.argmax(y_prob, axis=1)
y_pred = y_pred + 1
y_true = test_label_np

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision (Macro): {precision:.2f}")
print(f"Recall (Macro): {recall:.2f}")
print(f"F1 Score (Macro): {f1:.2f}")

conf_matrix = confusion_matrix(y_true, y_pred)
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.gca().invert_yaxis()
tick_marks = np.arange(conf_matrix.shape[0]) + 1
plt.xticks(tick_marks - 1, tick_marks)
plt.yticks(tick_marks - 1, tick_marks)
plt.title("Confusion Matrix of SK Learn Logistic Regression")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()




Sk Learn Convergence Speed

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

n_estimators_range = [10, 20, 50, 100]

accuracies = []

for n in n_estimators_range:
    model = RandomForestClassifier(n_estimators=n, random_state=42)
    model.fit(train_features_multiple, train_label_np)
    predictions = model.predict(test_features_multiple)
    
    acc = accuracy_score(test_label_np, predictions)
    accuracies.append(acc)

plt.figure(figsize=(8, 5))
plt.plot(n_estimators_range, accuracies, marker='o', linestyle='-')
plt.title('Convergence Speed of Random Forest')
plt.xlabel('Number of Trees (n_estimators)')
plt.ylabel('Accuracy')
plt.grid(True)
plt.show()


## New Paradigm: Active Learning

### New Model for active learning

In [138]:
import numpy as np
from collections import Counter
from joblib import Parallel, delayed

class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None

    def fit(self, X, y):
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_labels = len(y), len(np.unique(y))

        if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split:
            return Node(value=self._most_common_label(y))

        feat_idxs = np.random.choice(X.shape[1], self.n_features, replace=False)
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)

        if best_feature is None:
            return Node(value=self._most_common_label(y))

        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for threshold in thresholds:
                gain = self._information_gain(y, X_column, threshold)
                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = threshold

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, threshold):
        parent_entropy = self._entropy(y)

        left_idxs, right_idxs = self._split(X_column, threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l / n) * e_l + (n_r / n) * e_r

        return parent_entropy - child_entropy

    def _split(self, X_column, threshold):
        left_idxs = np.argwhere(X_column <= threshold).flatten()
        right_idxs = np.argwhere(X_column > threshold).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum(ps * np.log(ps + 1e-10))

    def _most_common_label(self, y):
        counter = Counter(y)
        return counter.most_common(1)[0][0]

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None

class RandomForestActive:
    def __init__(self, n_trees=10, max_depth=10, min_samples_split=2, n_features=None):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.n_features = n_features
        self.trees = []

    def fit(self, X, y):
        self.trees = Parallel(n_jobs=-1)(
            delayed(self._train_tree)(X, y) for _ in range(self.n_trees)
        )

    def _train_tree(self, X, y):
        tree = DecisionTree(
            max_depth=self.max_depth,
            min_samples_split=self.min_samples_split,
            n_features=self.n_features
        )
        X_sample, y_sample = self._bootstrap_samples(X, y)
        tree.fit(X_sample, y_sample)
        return tree

    def _bootstrap_samples(self, X, y):
        n_samples = X.shape[0]
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        return X[idxs], y[idxs]

    def predict(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        predictions = np.array([Counter(tree_preds[:, i]).most_common(1)[0][0] for i in range(X.shape[0])])
        return predictions

    def predict_proba(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])

        all_classes = set(np.concatenate(tree_preds.T))
        n_classes = len(all_classes)

        class_to_index = {cls: idx for idx, cls in enumerate(sorted(all_classes))}

        n_samples = X.shape[0]
        prob_matrix = np.zeros((n_samples, n_classes))

        for sample_idx in range(n_samples):
            class_counts = Counter(tree_preds[:, sample_idx])
            for cls, count in class_counts.items():
                prob_matrix[sample_idx, class_to_index[cls]] = count

        prob_matrix = prob_matrix / np.sum(prob_matrix, axis=1, keepdims=True)
        return prob_matrix



#### Evaluation

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

X, y = train_features_multiple, train_label_np
X_test, y_test = test_features_multiple, test_label_np

initial_size = 20
X_train, X_pool, y_train, y_pool = train_test_split(X, y, train_size=initial_size, stratify=y, random_state=42)

model = RandomForestActive(n_trees=10, max_depth=10, min_samples_split=2)

n_iterations = 10
batch_size = 10

for iteration in range(n_iterations):
    model.fit(X_train, y_train)

    pool_probs = model.predict_proba(X_pool)

    entropy = -np.sum(pool_probs * np.log(pool_probs + 1e-9), axis=1)

    uncertain_indices = np.argsort(entropy)[-batch_size:]

    X_new = X_pool[uncertain_indices]
    y_new = y_pool[uncertain_indices]
    X_train = np.vstack((X_train, X_new))
    y_train = np.hstack((y_train, y_new))

    mask = np.ones(len(X_pool), dtype=bool)
    mask[uncertain_indices] = False
    X_pool = X_pool[mask]
    y_pool = y_pool[mask]

    test_accuracy = accuracy_score(y_test, model.predict(X_test))
    print(f"Iteration {iteration + 1}, Test Accuracy: {test_accuracy:.2f}")

final_accuracy = accuracy_score(y_test, model.predict(X_test))
print(f"Final Test Accuracy: {final_accuracy:.2f}")


#### Active Learning for Old