In [None]:
import numpy as np
import pandas as pd
from collections import Counter
from random import random
from sklearn import datasets
from sklearn.model_selection import train_test_split
import sklearn.metrics

In [None]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None,*,value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        self.impurity_reduction = impurity_reduction

    def is_leaf_node(self):
        return self.value is not None

In [None]:
class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
        self.min_samples_split=min_samples_split
        self.max_depth=max_depth
        self.n_features=n_features
        self.root=None

    def fit(self, X, y):
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1],self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        # check the stopping criteria
        if (depth>=self.max_depth or n_labels==1 or n_samples<self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)

        # find the best split
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)

        # create child nodes
        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth+1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth+1)
        return Node(best_feature, best_thresh, left, right)


    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                # calculate the information gain
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold


    def _information_gain(self, y, X_column, threshold):
        # parent entropy
        parent_entropy = self._entropy(y)

        # create children
        left_idxs, right_idxs = self._split(X_column, threshold)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        # calculate the weighted avg. entropy of children
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l/n) * e_l + (n_r/n) * e_r

        # calculate the IG
        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log(p) for p in ps if p>0])


    def _most_common_label(self, y):
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

In [None]:
def accuracy(y_true, y_pred):
    correct_predictions = np.sum(y_true == y_pred)
    accuracy_score = correct_predictions / len(y_true)
    return accuracy_score



In [None]:
data = pd.read_csv('winequality-red_NO_ALCOHOL.csv', delimiter=';')


In [None]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1599 entries, 0 to 1598
Data columns (total 11 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   fixed acidity         1599 non-null   float64
 1   volatile acidity      1599 non-null   float64
 2   citric acid           1599 non-null   float64
 3   residual sugar        1599 non-null   float64
 4   chlorides             1599 non-null   float64
 5   free sulfur dioxide   1599 non-null   float64
 6   total sulfur dioxide  1599 non-null   float64
 7   density               1599 non-null   float64
 8   pH                    1599 non-null   float64
 9   sulphates             1599 non-null   float64
 10  quality               1599 non-null   int64  
dtypes: float64(10), int64(1)
memory usage: 137.5 KB


We see there is no NULL entries. And all of the data is float. But anyways we are using Desicion tree and random forest so this does not matter to us. There is no need to preprocess data in this case.

In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_selection import f_classif, mutual_info_classif
from sklearn.preprocessing import LabelEncoder


X = data.drop('quality', axis=1)  # Keep X as a DataFrame for now
y = data['quality'].values  # y can be an array since we don't need column names for the target variable

# Now calculate correlations directly on 'data' DataFrame
correlations = data.corr()['quality'].drop('quality')
correlation_results = pd.DataFrame(correlations)
correlation_results.columns = ['Pearson Correlation']

# Calculate ANOVA F-score using the DataFrame, then convert X to values if necessary
f_scores, _ = f_classif(X, y)
f_score_results = pd.DataFrame(f_scores, index=X.columns, columns=['ANOVA F-score'])

# Calculate Mutual Information
mi_scores = mutual_info_classif(X, y)
mi_score_results = pd.DataFrame(mi_scores, index=X.columns, columns=['Mutual Information'])

# Combine all results into a single DataFrame
feature_importances = pd.concat([correlation_results, f_score_results, mi_score_results], axis=1)

# Display the results sorted by Mutual Information
print(feature_importances.sort_values(by='Mutual Information', ascending=False))

                      Pearson Correlation  ANOVA F-score  Mutual Information
volatile acidity                -0.390558      60.913993            0.164778
density                         -0.174919      13.396357            0.092710
sulphates                        0.251397      22.273376            0.088502
total sulfur dioxide            -0.185100      25.478510            0.078537
fixed acidity                    0.124052       6.283081            0.059553
citric acid                      0.226373      19.690664            0.056849
chlorides                       -0.128907       6.035639            0.028675
residual sugar                   0.013732       1.053374            0.028605
pH                              -0.057731       4.341764            0.005497
free sulfur dioxide             -0.050656       4.754233            0.002305


We see that free sulfur dioxide is very uncorralated
we can delete it from our features

In [None]:
X = data.drop('quality', axis=1).values
y = data['quality'].values

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)



In [None]:
tree = DecisionTree(max_depth=10, min_samples_split=5, n_features=3)
tree.fit(X_train, y_train)

NameError: name 'impurity_reduction' is not defined

In [None]:
predictions = tree.predict(X_test)

In [None]:
accuracy_score = accuracy(y_test, predictions)

print(f"Accuracy: {accuracy_score:.4f}")


Accuracy: 0.5406


Maybe let's try find good parameters?

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score


param_grid = {
    'max_depth': [3, 5, 10],
    'min_samples_split': [2, 5, 10],
    'n_features': [None, 'sqrt', 'log2']
}

def n_features_converter(param, n_total_features):
    if param == 'sqrt':
        return int(np.sqrt(n_total_features))
    elif param == 'log2':
        return int(np.log2(n_total_features))
    else:
        return param

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

best_score = 0
best_params = {}

for max_depth in param_grid['max_depth']:
    for min_samples_split in param_grid['min_samples_split']:
        for n_features in param_grid['n_features']:
            adjusted_n_features = n_features_converter(n_features, X_train.shape[1])

            tree = DecisionTree(max_depth=max_depth, min_samples_split=min_samples_split, n_features=adjusted_n_features)
            tree.fit(X_train, y_train)

            predictions = tree.predict(X_test)
            accuracy = accuracy_score(y_test, predictions)
            if accuracy > best_score:
                best_score = accuracy
                best_params = {'max_depth': max_depth, 'min_samples_split': min_samples_split, 'n_features': n_features}

print("Best parameters:", best_params)
print("Best accuracy:", best_score)


Best parameters: {'max_depth': 10, 'min_samples_split': 2, 'n_features': 'sqrt'}
Best accuracy: 0.603125


As we see the accuracy is low again. Because we only have 1 decision tree. What if we use multiple desicion trees

In [None]:
class RandomForest:
    def __init__(self, n_trees=10, max_depth=10, min_samples_split=2, n_feature=None):
        self.n_trees = n_trees
        self.max_depth=max_depth
        self.min_samples_split=min_samples_split
        self.n_features=n_feature
        self.trees = []

    def fit(self, X, y):
        self.trees = []
        for _ in range(self.n_trees):
            tree = DecisionTree(max_depth=self.max_depth,
                            min_samples_split=self.min_samples_split,
                            n_features=self.n_features)
            X_sample, y_sample = self._bootstrap_samples(X, y)
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def _bootstrap_samples(self, X, y):
        n_samples = X.shape[0]
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        return X[idxs], y[idxs]

    def _most_common_label(self, y):
        counter = Counter(y)
        most_common = counter.most_common(1)[0][0]
        return most_common

    def predict(self, X):
        predictions = np.array([tree.predict(X) for tree in self.trees])
        tree_preds = np.swapaxes(predictions, 0, 1)
        predictions = np.array([self._most_common_label(pred) for pred in tree_preds])
        return predictions

In [None]:
clf = RandomForest(n_trees=100)

In [None]:
clf.fit(X_train, y_train)

In [None]:
y_pred = clf.predict(X_test)

In [None]:
y_train_pred = clf.predict(X_train)

In [None]:
accuracy_score = accuracy(y_test, y_pred)

print(sklearn.metrics.mean_absolute_error(y_test, y_pred))

print(f"Accuracy: {accuracy_score*100:.2f}%")



0.390625
Accuracy: 63.44%


Now let's compare our implementation to Ready One

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

In [None]:
clf_lib = RandomForestClassifier(n_estimators=100)

In [None]:
clf_lib.fit(X_train, y_train)

In [None]:
y_lib_pred = clf_lib.predict(X_test)

In [None]:
y_lib_train_pred = clf_lib.predict(X_train)

In [None]:
acc_lib_train = accuracy_score(y_train, y_lib_train_pred)
acc_lib_train

1.0

In [None]:
acc_lib = accuracy_score(y_test, y_lib_pred)
acc_lib

0.728125