In [29]:
import warnings
warnings.filterwarnings('ignore')

# Complex pytorch
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from complexPyTorch.complexLayers import *
from complexPyTorch.complexFunctions import *
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data

# Plot
import matplotlib.pyplot as plt
import seaborn as sns
import time

# Load Data
import numpy as np
import json
import os
import math
import librosa
import pathlib
from scipy.spatial.distance import cdist
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
import random
import pandas as pd

# MFCCS
from scipy.io import wavfile
import scipy.fftpack as fft
from scipy.signal import get_window
import librosa
import librosa.display
import IPython.display as ipd
import scipy as spp

# CV
from sklearn.model_selection import cross_val_score, KFold

# ML
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, SGDClassifier, RidgeClassifier, PassiveAggressiveClassifier
from sklearn.neighbors import KNeighborsClassifier, RadiusNeighborsClassifier
from sklearn.naive_bayes import GaussianNB, MultinomialNB, ComplementNB, BernoulliNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.svm import SVC, NuSVC, LinearSVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import accuracy_score
from scipy import stats

In [17]:
def custom_cross_val(model, X, y, k=5):
    np.random.seed(42)
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    splits = np.array_split(indices, k)
    accuracies = []
    for i in range(k):
        test_indices = splits[i]
        train_indices = np.concatenate([splits[j] for j in range(k) if j != i])
        X_train, y_train = X.iloc[train_indices], y.iloc[train_indices]
        X_test, y_test = X.iloc[test_indices], y.iloc[test_indices] 
        model.fit(X_train.to_numpy(), y_train.to_numpy())
        y_pred = model.predict(X_test.to_numpy())
        accuracy = np.mean(y_pred == y_test.to_numpy())
        accuracies.append(accuracy)   
    return accuracies

# RF Class

In [19]:
class RandomForest:
    '''
    A class that implements Random Forest algorithm from scratch.

    For more information, refer to https://towardsdatascience.com/master-machine-learning-random-forest-from-scratch-with-python-3efdd51b6d7a

    Parameters:
    ----------    
    num_tree: int, default=5
        The number of voting decision tree classifiers used for classification.

    subsample_size: float, default=None
        The proportion of the total training examples used to train each decision trees.

    max_depth: int, default=None
        The maximum depth of the tree. If None, then nodes are expanded until, all leaves are the purest.

    max_features: int, float, default=None
        For each decision tree, at each split from parent node to child nodes, consider only 'max features' to find threshold split. 
        If float and <1, max_features take the proportion of the features in the dataset.

    bootstrap: bool, default=True
        Bootstrap sampling of training examples, with or without replacement. 

    random_state: int, default=None
        Controls the randomness of the estimator. The features are always randomly permuted at each split in each decision tree, 
        and bootstrap sampling is randomly permuted.
    '''
    def __init__(self, num_trees=10, tree = None, subsample_size=None, max_depth=None, 
                 max_features=None, bootstrap=True, random_state=None):
        self.num_trees = num_trees
        self.tree = tree
        self.subsample_size = subsample_size
        self.max_depth = max_depth
        self.max_features = max_features
        self.bootstrap = bootstrap
        self.random_state = random_state
        # Will store individually trained decision trees
        self.decision_trees = []

    def sample(self, X, y, random_state):
        n_rows, n_cols = X.shape
        if self.subsample_size is None:
            sample_size = n_rows
        else:
            sample_size = int(n_rows*self.subsample_size)
        np.random.seed(random_state)
        samples = np.random.choice(a=n_rows, size=sample_size, replace=self.bootstrap)
        return X[samples], y[samples]


    def fit(self, X, y):
        if len(self.decision_trees) > 0:
            self.decision_trees = []
        if isinstance(X, pd.core.frame.DataFrame):
            X = X.values
        if isinstance(y, pd.core.series.Series):
            y = y.values   
        # Build each tree of the forest
        num_built = 0
        while num_built < self.num_trees:

            clf = self.tree(
                max_depth=self.max_depth
            )
            # Obtain data sample
            _X, _y = self.sample(X, y, self.random_state)
            # Train
            clf.fit(_X, _y)
            # Save the classifier
            self.decision_trees.append(clf)
            num_built += 1
            if self.random_state is not None:
                self.random_state += 1

    def predict(self, X):
        y = []
        for tree in self.decision_trees:
            y.append(tree.predict(X))
        # Reshape so we can find the most common value
        y = np.swapaxes(y, axis1=0, axis2=1)
        # Use majority voting for the final prediction
        predicted_classes = stats.mode(y,axis=1)[0].reshape(-1)
        return predicted_classes

# Create Data

In [5]:
DATASET_PATH = "Data/train"
SAMPLE_RATE = 22050
TRACK_DURATION = 30 # measured in seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION
BATCH_SIZE = 32
NUM_EPOCHS = 50
genre_list = os.listdir(DATASET_PATH)
if '.DS_Store' in genre_list: genre_list.remove('.DS_Store')
genre_mappings = dict(zip(genre_list, range(len(genre_list))))
print(genre_mappings)

{'pop': 0, 'metal': 1, 'disco': 2, 'blues': 3, 'reggae': 4, 'classical': 5, 'rock': 6, 'hiphop': 7, 'country': 8, 'jazz': 9}


# 1. Simple RF with Real Valued Frequency domain Features

In [20]:
tr_df = pd.read_csv("Data/exp4_data/train_tff_mfcc.csv")
te_df = pd.read_csv("Data/exp4_data/test_tff_mfcc.csv")

In [21]:
# Separate features and labels
X_train = tr_df.drop('label', axis=1)
y_train = tr_df['label']
X_test = te_df.drop('label', axis=1)
y_test = te_df['label']

In [27]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    """
    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    """
    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(X <= thresh).flatten()
        right_idx = np.argwhere(X > thresh).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        if parent_loss - child_loss == 0: self.flag_zero = True
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if score > split['score']:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        #print(split['score'])
        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [30]:
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

Accuracy: 0.58375



In [31]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')

Fold 1 Accuracy: 0.7397397397397397
Fold 2 Accuracy: 0.7227227227227228
Fold 3 Accuracy: 0.7242242242242243
Fold 4 Accuracy: 0.7262262262262262
Fold 5 Accuracy: 0.7237237237237237
Mean Accuracy: 0.7273273273273274


# 2. Simple RF with Complex Valued Frequency Domain Features

In [32]:
tr_df = pd.read_csv("Data/exp4_data/train_tff_mfcc_comp.csv")
te_df = pd.read_csv("Data/exp4_data/test_tff_mfcc_comp.csv")

def df_csv_complex(df):
    result_df = df.copy()  # Make a copy to avoid modifying the original DataFrame
    result_df.iloc[:, :-1] = df.iloc[:, :-1].apply(lambda col: col.apply(
        lambda val: torch.tensor((complex(val.strip('()'))), dtype=torch.complex64) ))
    return result_df

tr_df = df_csv_complex(tr_df)
te_df = df_csv_complex(te_df)
tr_df = tr_df.applymap(lambda x: x.numpy() if hasattr(x, 'numpy') else x)
te_df = te_df.applymap(lambda x: x.numpy() if hasattr(x, 'numpy') else x)

In [234]:
# Separate features and labels
X_train = tr_df.drop('label', axis=1)
y_train = tr_df['label']
X_test = te_df.drop('label', axis=1)
y_test = te_df['label']

## 2.1 Compare only real

In [235]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(X <= thresh).flatten()
        right_idx = np.argwhere(X > thresh).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if score > split['score']:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [236]:
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

Accuracy: 0.453125



In [237]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')

Fold 1 Accuracy: 0.5530530530530531
Fold 2 Accuracy: 0.5495495495495496
Fold 3 Accuracy: 0.5495495495495496
Fold 4 Accuracy: 0.55005005005005
Fold 5 Accuracy: 0.5335335335335335
Mean Accuracy: 0.5471471471471472


## 2.2 Compare only magnitude

In [239]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(np.abs(X) <= np.abs(thresh)).flatten()
        right_idx = np.argwhere(np.abs(X) > np.abs(thresh)).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if np.abs(score) > np.abs(split['score']) if split['score'] != -1 else -1:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [240]:
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

Accuracy: 0.378125



In [177]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')

Fold 1 Accuracy: 0.47147147147147145
Fold 2 Accuracy: 0.45245245245245247
Fold 3 Accuracy: 0.47097097097097096
Fold 4 Accuracy: 0.476976976976977
Fold 5 Accuracy: 0.4954954954954955
Mean Accuracy: 0.47347347347347346


# 3. Simple RF with Complex Valued Frequency Domain Features alt MFCC extraction

In [33]:
tr_df = pd.read_csv("Data/exp4_data/train_tff_mfcc_comp_mod.csv")
te_df = pd.read_csv("Data/exp4_data/test_tff_mfcc_comp_mod.csv")

def df_csv_complex(df):
    result_df = df.copy()  # Make a copy to avoid modifying the original DataFrame
    result_df.iloc[:, :-1] = df.iloc[:, :-1].apply(lambda col: col.apply(
        lambda val: torch.tensor((complex(val.strip('()'))), dtype=torch.complex64) ))
    return result_df

tr_df = df_csv_complex(tr_df)
te_df = df_csv_complex(te_df)
tr_df = tr_df.applymap(lambda x: x.numpy() if hasattr(x, 'numpy') else x)
te_df = te_df.applymap(lambda x: x.numpy() if hasattr(x, 'numpy') else x)

In [34]:
# Separate features and labels
X_train = tr_df.drop('label', axis=1)
y_train = tr_df['label']
X_test = te_df.drop('label', axis=1)
y_test = te_df['label']

## 3.1 Compare real only

In [35]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(X <= thresh).flatten()
        right_idx = np.argwhere(X > thresh).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if score > split['score']:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [37]:
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

Accuracy: 0.56



In [38]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')

Fold 1 Accuracy: 0.6941941941941941
Fold 2 Accuracy: 0.7167167167167167
Fold 3 Accuracy: 0.6981981981981982
Fold 4 Accuracy: 0.6796796796796797
Fold 5 Accuracy: 0.6936936936936937
Mean Accuracy: 0.6964964964964965


## 3.2 Compare magnitude only

In [6]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(np.abs(X) <= np.abs(thresh)).flatten()
        right_idx = np.argwhere(np.abs(X) > np.abs(thresh)).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if np.abs(score) > np.abs(split['score']) if split['score'] != -1 else -1:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [None]:
np.random.seed(42)
model = DecisionTree(max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

In [None]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')

# 4. RF: Complex Valued frequncy Domain + Real MFCC Coeffs

In [None]:
tr_df = pd.read_csv("Data/exp4_data/train_tff_mfcc_comp.csv")
te_df = pd.read_csv("Data/exp4_data/test_tff_mfcc_comp.csv")
tr_df_mfcc = pd.read_csv("Data/exp4_data/train_tff_mfcc.csv")
te_df_mfcc = pd.read_csv("Data/exp4_data/test_tff_mfcc.csv")

def df_csv_complex(df):
    result_df = df.copy()  # Make a copy to avoid modifying the original DataFrame
    result_df.iloc[:, :-1] = df.iloc[:, :-1].apply(lambda col: col.apply(
        lambda val: torch.tensor((complex(val.strip('()'))), dtype=torch.complex64) ))
    return result_df

tr_df = df_csv_complex(tr_df)
te_df = df_csv_complex(te_df)

for i in ["mean", "var"]:
    for j in range(1, 17):
        tr_df[f"mfcc_{j}_{i}"] = tr_df_mfcc[f"mfcc_{j}_{i}"] 
        te_df[f"mfcc_{j}_{i}"] = te_df_mfcc[f"mfcc_{j}_{i}"] 
        
tr_df = tr_df.applymap(lambda x: x.numpy() if hasattr(x, 'numpy') else x)
te_df = te_df.applymap(lambda x: x.numpy() if hasattr(x, 'numpy') else x)

In [None]:
# Separate features and labels
X_train = tr_df.drop('label', axis=1)
y_train = tr_df['label']
X_test = te_df.drop('label', axis=1)
y_test = te_df['label']

## 4.1 Compare real Only

In [None]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(X <= thresh).flatten()
        right_idx = np.argwhere(X > thresh).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if score > split['score']:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [None]:
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

In [None]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')

## 4.2 Compare magnitude Only

In [None]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
    
    def is_leaf(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, max_depth=100, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def _is_finished(self, depth):
        if (depth >= self.max_depth
            or self.n_class_labels == 1
            or self.n_samples < self.min_samples_split):
            return True
        return False
    
    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        entropy = -np.sum([p * np.log2(p) for p in proportions if p > 0])
        return entropy

    def _create_split(self, X, thresh):
        left_idx = np.argwhere(np.abs(X) <= np.abs(thresh)).flatten()
        right_idx = np.argwhere(np.abs(X) > np.abs(thresh)).flatten()
        return left_idx, right_idx

    def _information_gain(self, X, y, thresh):
        parent_loss = self._entropy(y)
        left_idx, right_idx = self._create_split(X, thresh)
        n, n_left, n_right = len(y), len(left_idx), len(right_idx)

        if n_left == 0 or n_right == 0: 
            return 0
        
        child_loss = (n_left / n) * self._entropy(y[left_idx]) + (n_right / n) * self._entropy(y[right_idx])
        return parent_loss - child_loss

    def _best_split(self, X, y, features):
        split = {'score':- 1, 'feat': None, 'thresh': None}

        for feat in features:
            X_feat = X[:, feat]
            thresholds = np.unique(X_feat)
            for thresh in thresholds:
                score = self._information_gain(X_feat, y, thresh)

                if np.abs(score) > np.abs(split['score']) if split['score'] != -1 else -1:
                    split['score'] = score
                    split['feat'] = feat
                    split['thresh'] = thresh

        return split['feat'], split['thresh']
    
    def _build_tree(self, X, y, depth=0):
        self.n_samples, self.n_features = X.shape
        self.n_class_labels = len(np.unique(y))

        # stopping criteria
        if self._is_finished(depth):
            try: 
                most_common_Label = np.argmax(np.bincount(y))
            except ValueError as e:
                most_common_Label = 0
            return Node(value=most_common_Label)

        # get best split
        rnd_feats = np.random.choice(self.n_features, self.n_features, replace=False)
        best_feat, best_thresh = self._best_split(X, y, rnd_feats)

        # grow children recursively
        left_idx, right_idx = self._create_split(X[:, best_feat], best_thresh)
        left_child = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
        right_child = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
        return Node(best_feat, best_thresh, left_child, right_child)
    
    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def predict(self, X):
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

In [None]:
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
model.fit(X_train.to_numpy(), y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
accuracy = accuracy_score(y_test.to_numpy(), y_pred)
print(f'Accuracy: {accuracy}\n')

In [None]:
# CV:
merged_df = pd.concat([tr_df, te_df], axis=0)
X = merged_df.drop('label', axis=1)
y = merged_df['label']
np.random.seed(42)
model = RandomForest(tree = DecisionTree, max_depth=10)
cv_results = custom_cross_val(model, X, y, k=5)
for i, acc in enumerate(cv_results):
    print(f'Fold {i+1} Accuracy: {acc}')
print(f'Mean Accuracy: {np.mean(cv_results)}')