<h1> Assignment 1 - Task 2 Solution </h1>

Student name: Ngo Vu Anh </br>
UOW ID: 7673541   </br>
Tutorial Group: T02   

<h1> Import tools </h1>

In [None]:
import numpy as np
import pandas as pd
from collections import Counter


<h1> Import the data </h1>

In [None]:
test_file = 'customer_churn_dataset-testing-master.csv'
train_file = 'customer_churn_dataset-training-master.csv'

test_df = pd.read_csv(test_file)
train_df = pd.read_csv(train_file)

# Combine train df and test df

train_df = pd.concat([train_df, test_df], axis=0)

# Reset the index of the combined DataFrame
train_df.reset_index(drop=True, inplace=True)

# Select a portion of the data
train_df = train_df.sample(n=50000, random_state=42)

<h1> Data preprocessing </h1>

In [None]:
# Drop missing values 

train_df = train_df.dropna()
test_df = test_df.dropna()

In [None]:
# Z Score normalization for Last Interaction 

normalized_last_interaction = (train_df['Last Interaction'] - train_df['Last Interaction'].mean()) / train_df['Last Interaction'].std()
normalized_last_interaction_test = (test_df['Last Interaction'] - test_df['Last Interaction'].mean()) / test_df['Last Interaction'].std()

# Replace the previous Last Interaction with new one 
train_df['Last Interaction'] = normalized_last_interaction
test_df['Last Interaction'] = normalized_last_interaction_test

In [None]:
# Create 5 bins for attribute Total Spend 

bin_labels = ['Bin 1', 'Bin 2', 'Bin 3', 'Bin 4', 'Bin 5']

train_df['Total Spend'] = pd.cut(train_df['Total Spend'], bins=5, labels=bin_labels)
test_df['Total Spend'] = pd.cut(test_df['Total Spend'], bins=5, labels=bin_labels)

In [None]:
train_df.head()

In [None]:
# Drop unecessary columns 

train_df.drop('CustomerID', axis=1, inplace=True)
test_df.drop('CustomerID', axis=1, inplace = True)

# Drop gender to ensure equality 
train_df.drop('Gender', axis=1, inplace=True)
test_df.drop('Gender', axis=1, inplace = True)


<h1> Prepare decision tree classifier </h1>

<h2> Node Class </h2> 

<h2> Tree class </h2> 

In [None]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, gain=0, y=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        self.gain = gain
        self.y = y
        self.count = Counter(y)

    def is_leaf_node(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, depth=None, n_features=None, criterion="GINI"):
        self.labels = None
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None
        self.depth = depth if depth else 0
        self.criterion = criterion

    def fit(self, X, y):
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
        self.labels = X.columns
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value, y=y)

        feat_idxs = np.random.choice(n_features, self.n_features, replace=False)
        best_feature, best_threshold, best_gain = self._best_split(X, y, feat_idxs)

        left_idxs, right_idxs = self._split(X.iloc[:, best_feature], best_threshold)
        left = self._grow_tree(X.iloc[left_idxs, :], y.iloc[left_idxs], depth + 1)
        right = self._grow_tree(X.iloc[right_idxs, :], y.iloc[right_idxs], depth + 1)
        return Node(best_feature, best_threshold, left, right, best_gain, y)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        best_split_idx, best_split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X.iloc[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                if self.criterion == "GINI":
                    gain = self._gini_gain(y, X_column, thr)
                elif self.criterion == "GAIN_RATIO":
                    gain = self._gain_ratio(y, X_column, thr)
                else:
                    gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    best_split_idx = feat_idx
                    best_split_threshold = thr

        return best_split_idx, best_split_threshold, best_gain

    def _gini_gain(self, y, X_column, threshold):
        gini_base = self._get_gini(y)

        left_idxs, right_idxs = self._split(X_column, threshold)
        left_counts = Counter(y.iloc[left_idxs])
        right_counts = Counter(y.iloc[right_idxs])

        left_class0_count = Counter(left_counts).get(0, 0)
        left_class1_count = Counter(left_counts).get(1, 0)
        right_class0_count = Counter(right_counts).get(0, 0)
        right_class1_count = Counter(right_counts).get(1, 0)

        gini_left = self._gini_impurity(left_class0_count, left_class1_count)
        gini_right = self._gini_impurity(right_class0_count, right_class1_count)

        n_left = left_class0_count + left_class1_count
        n_right = right_class0_count + right_class1_count

        w_left = n_left / (n_left + n_right)
        w_right = n_right / (n_left + n_right)

        w_gini = w_left * gini_left + w_right * gini_right

        gini_gain = gini_base - w_gini
        return gini_gain

    def _get_gini(self, y):
        class0_count = Counter(y).get(0, 0)
        class1_count = Counter(y).get(1, 0)
        return self._gini_impurity(class0_count, class1_count)

    def _gini_impurity(self, class0_count, class1_count):
        if class0_count is None:
            class0_count = 0
        if class1_count is None:
            class1_count = 0
        n = class0_count + class1_count

        if n == 0:
            return 0.0

        p0 = class0_count / n
        p1 = class1_count / n
        gini = 1 - (p0 ** 2 + p1 ** 2)
        return gini

    def _gain_ratio(self, y, X_column, threshold):
        parent_entropy = self._entropy(y)
        left_idxs, right_idxs = self._split(X_column, threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        e_left, e_right = self._entropy(y.iloc[left_idxs]), self._entropy(y.iloc[right_idxs])
        child_entropy = (n_left / n) * e_left + (n_right / n) * e_right

        information_gain = parent_entropy - child_entropy

        split_info = -(n_left / n * np.log(n_left / n) + n_right / n * np.log(n_right / n))

        return information_gain / split_info

    def _information_gain(self, y, X_column, threshold):
        parent_entropy = self._entropy(y)
        left_idxs, right_idxs = self._split(X_column, threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        e_left, e_right = self._entropy(y.iloc[left_idxs]), self._entropy(y.iloc[right_idxs])
        child_entropy = (n_left / n) * e_left + (n_right / n) * e_right

        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_threshold):
        left_idxs = np.argwhere(X_column.to_numpy() <= split_threshold).flatten()
        right_idxs = np.argwhere(X_column.to_numpy() > split_threshold).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        counter = Counter(y)
        if len(Counter(y).most_common(1)) == 0:
            value = None
        else:
            value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for idx, x in X.iterrows()])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node() or node.feature is None:
            return node.value
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def counter_to_str(self, counter: Counter):
        inside = [f"{key}: {value}" for key, value in counter.items()]
        return f"({', '.join(inside)})"

    def print_info(self, node, depth):
        if node.feature is None:
            return

        preamble0 = ' ' * depth * 2 + ('\t\t' if depth > 0 else '')

        print(f"{preamble0} {self.labels[node.feature]} <= {node.threshold} ? {node.gain}")

        self.print_info(node.left, depth + 1)
        self.print_info(node.right, depth + 1)

    def print_tree(self):
        self.print_info(self.root, 0)


<h2> Create cross validation </h2>

In [None]:
train = train_df.sample(frac=0.6, random_state= 40)

test = train_df.drop(train.index)

# Set up the train and target

X_train = train[train.columns[:-1]]
X_test = test[test.columns[:-1]]
y_train = train['Churn']
y_test = test['Churn']


In [None]:
# Initialize a decision tree - Information gain
clf = DecisionTree(max_depth=20, criterion="")
clf.fit(X_train, y_train)

predictions_info_gain = clf.predict(X_test)
predictions_info_gain_train = clf.predict(X_train)

print("Root")
clf.print_tree()

In [None]:
# Check accuracy of the Decision Tree
def accuracy(y_test, y_pred):
    return np.sum(y_test == y_pred) / len(y_test)
acc = accuracy(y_test, predictions_info_gain)

acc_train = accuracy(y_train, predictions_info_gain_train)

print("Training set accuracy: ", acc_train)
print("Test set accuracy: ", acc)

In [None]:
# Initialize a decision tree - Gain Ratio

clf_gain = DecisionTree(max_depth=20, criterion="GAIN_RATIO")
clf_gain.fit(X_train, y_train)

predictions_gain_ratio = clf_gain.predict(X_test)
predictions_gain_ratio_train = clf_gain.predict(X_train)

# Initialize a decision tree - Gini Index

clf_gini = DecisionTree(max_depth=20, criterion="GINI")
clf_gini.fit(X_train, y_train)

predictions_gini = clf_gini.predict(X_test)
predictions_gini_train = clf_gini.predict(X_train)



## Voting function

In [None]:
def voting(predictions_info_gain, predictions_gini, predictions_gain_ratio):
    ensemble_predictions = []

    for pred_info_gain, pred_gini, pred_gain_ratio in zip(predictions_info_gain, predictions_gini, predictions_gain_ratio):
        # Initialize votes dictionary to count the votes for each class label
        votes = {}
        for pred in [pred_info_gain, pred_gini, pred_gain_ratio]:
            if pred not in votes:
                votes[pred] = 0
            votes[pred] += 1

        # Get the class label with the highest votes (majority vote)
        majority_vote = max(votes, key=votes.get)
        ensemble_predictions.append(majority_vote)

    return ensemble_predictions


In [None]:
ensemble_predictions = voting(predictions_info_gain, predictions_gini, predictions_gain_ratio)

# Check accuracy of the Decision Tree
def accuracy(y_test, y_pred):
    return np.sum(y_test == y_pred) / len(y_test)
acc = accuracy(y_test, ensemble_predictions)

print("Test set accuracy: ", acc)

The voting function has chosen the method to split the decision tree that can gain the highest accuracy. And the highest accuracy can be achieved is 89%