# Project 2: MapReduce on DecisionTree

In [1]:
import pandas as pd
import numpy as np
#from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [2]:
df = pd.read_csv('/../../data/p_dsi/big_data_scaling_sp23/project/DT-MapReduce/adult.csv')

In [3]:
df

Unnamed: 0,age,workclass,fnlwgt,education,education.num,marital.status,occupation,relationship,race,sex,capital.gain,capital.loss,hours.per.week,native.country,income
0,90,?,77053,HS-grad,9,Widowed,?,Not-in-family,White,Female,0,4356,40,United-States,<=50K
1,82,Private,132870,HS-grad,9,Widowed,Exec-managerial,Not-in-family,White,Female,0,4356,18,United-States,<=50K
2,66,?,186061,Some-college,10,Widowed,?,Unmarried,Black,Female,0,4356,40,United-States,<=50K
3,54,Private,140359,7th-8th,4,Divorced,Machine-op-inspct,Unmarried,White,Female,0,3900,40,United-States,<=50K
4,41,Private,264663,Some-college,10,Separated,Prof-specialty,Own-child,White,Female,0,3900,40,United-States,<=50K
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
32556,22,Private,310152,Some-college,10,Never-married,Protective-serv,Not-in-family,White,Male,0,0,40,United-States,<=50K
32557,27,Private,257302,Assoc-acdm,12,Married-civ-spouse,Tech-support,Wife,White,Female,0,0,38,United-States,<=50K
32558,40,Private,154374,HS-grad,9,Married-civ-spouse,Machine-op-inspct,Husband,White,Male,0,0,40,United-States,>50K
32559,58,Private,151910,HS-grad,9,Widowed,Adm-clerical,Unmarried,White,Female,0,0,40,United-States,<=50K


In [4]:
df.dtypes

age                int64
workclass         object
fnlwgt             int64
education         object
education.num      int64
marital.status    object
occupation        object
relationship      object
race              object
sex               object
capital.gain       int64
capital.loss       int64
hours.per.week     int64
native.country    object
income            object
dtype: object

In [5]:
na_counts = df.isna().sum()

# sort the columns by the number of NaN values in descending order
na_counts = na_counts.sort_values(ascending=False)

# print the result
print(na_counts)

income            0
native.country    0
hours.per.week    0
capital.loss      0
capital.gain      0
sex               0
race              0
relationship      0
occupation        0
marital.status    0
education.num     0
education         0
fnlwgt            0
workclass         0
age               0
dtype: int64


In [6]:
df['income.num'] = df['income'].map({'<=50K': 0, '>50K': 1})

In [7]:
df['income'].unique()

array(['<=50K', '>50K'], dtype=object)

In [8]:
df['income.num'].unique()

array([0, 1])

## Step 1

In [9]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_leaf=5):
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf

    def fit(self, X, y):
        self.num_classes = len(np.unique(y))
        self.tree = self._build_tree(X, y)

    def predict(self, X):
        preds = []
        for x in X:
            pred = self._predict_recursive(x, self.tree)
            preds.append(pred)
        return np.array(preds)

    def _build_tree(self, X, y, depth=0):
        num_samples, num_features = X.shape
        num_samples_per_class = [np.sum(y == i) for i in range(self.num_classes)]
        most_common_class = np.argmax(num_samples_per_class)

        if (depth == self.max_depth
                or num_samples < self.min_samples_leaf
                or len(np.unique(y)) == 1):
            return most_common_class

        feature_indices = np.random.choice(num_features, int(np.sqrt(num_features)), replace=False)

        best_feature, best_threshold = self._choose_best_feature(X, y, feature_indices)

        left_indices = np.where(X[:, best_feature] < best_threshold)[0]
        right_indices = np.where(X[:, best_feature] >= best_threshold)[0]

        left_tree = self._build_tree(X[left_indices, :], y[left_indices], depth+1)
        right_tree = self._build_tree(X[right_indices, :], y[right_indices], depth+1)

        return (best_feature, best_threshold, left_tree, right_tree)

    def _predict_recursive(self, x, tree):
        if isinstance(tree, int):
            return tree

        feature, threshold, left_tree, right_tree = tree
        if x[feature] < threshold:
            return self._predict_recursive(x, left_tree)
        else:
            return self._predict_recursive(x, right_tree)

    def _choose_best_feature(self, X, y, feature_indices):
        best_gain = -np.inf
        split_index, split_threshold = None, None
        for i in feature_indices:
            feature_values = X[:, i]
            thresholds = np.unique(feature_values)
            for threshold in thresholds:
                gain = self._information_gain(X, y, i, threshold)
                if gain > best_gain:
                    best_gain = gain
                    split_index = i
                    split_threshold = threshold

        return split_index, split_threshold

    def _information_gain(self, X, y, split_index, split_threshold):
        parent_entropy = self._entropy(y)
        left_indices = np.where(X[:, split_index] < split_threshold)[0]
        right_indices = np.where(X[:, split_index] >= split_threshold)[0]
        if len(left_indices) == 0 or len(right_indices) == 0:
            return 0
        left_entropy = self._entropy(y[left_indices])
        right_entropy = self._entropy(y[right_indices])
        num_left = len(left_indices)
        num_right = len(right_indices)
        child_entropy = (num_left / len(y)) * left_entropy + (num_right / len(y)) * right_entropy
        return parent_entropy - child_entropy

    def _entropy(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return -np.sum(probabilities * np.log2(probabilities))


In [10]:
target = 'income.num'
features = ['age', 'education.num', 'hours.per.week']

In [11]:
X = df[features]
y = df[target]

In [12]:
#X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [13]:
#X_train = X_train.values
#y_train = y_train.values
#X_test = X_test.values
#y_test = y_test.values

In [14]:
#print(X_test.values)

In [15]:
# Initialize the classifier
#clf = DecisionTreeClassifier()
#model = DecisionTree(max_depth=2, min_samples_split=2, random_state=42)

model = DecisionTree(max_depth=5)

# Train the classifier on the training data
model.fit(X.values, y.values)

# Predict the class labels of the test data
#y_pred = model.predict(X_test)

# Evaluate the performance of the classifier
#accuracy = accuracy_score(y_test, y_pred)

In [16]:
print(accuracy)

NameError: name 'accuracy' is not defined

## Step 2

In [None]:
import multiprocessing

class DecisionTree:
    def __init__(self, max_depth=10, min_samples_split=2, random_state=None):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.random_state = random_state
        self.tree = None

    def fit(self, X, y):
        self.tree = self._build_tree(X.values, y.values)

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.tree) for x in X.values])

    def _build_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        # Stopping criteria
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
            return np.argmax(np.bincount(y))

        # Select the best feature and split point
        best_gain = -np.inf
        best_feature, best_split_point = None, None
        for feature in range(n_features):
            split_points = np.unique(X[:, feature])
            results = []
            with multiprocessing.Pool() as pool:
                for split_point in split_points:
                    left_idx = X[:, feature] <= split_point
                    right_idx = X[:, feature] > split_point
                    if (len(left_idx) == 0 or len(right_idx) == 0):
                        continue
                    result = pool.apply_async(self._information_gain, args=(y, y[left_idx], y[right_idx]))
                    results.append((split_point, result))
                for split_point, result in results:
                    gain = result.get()
                    if (gain > best_gain):
                        best_gain = gain
                        best_feature = feature
                        best_split_point = split_point

        # Create the current node
        current_node = {}
        current_node['feature'] = best_feature
        current_node['split_point'] = best_split_point
        current_node['left'] = self._build_tree(X[X[:, best_feature] <= best_split_point], y[X[:, best_feature] <= best_split_point], depth+1)
        current_node['right'] = self._build_tree(X[X[:, best_feature] > best_split_point], y[X[:, best_feature] > best_split_point], depth+1)
        return current_node

    def _information_gain(self, y, y_left, y_right):
        H_y = self._entropy(y)
        H_y_left = self._entropy(y_left)
        H_y_right = self._entropy(y_right)
        n_left, n_right = len(y_left), len(y_right)
        return H_y - (n_left / len(y)) * H_y_left - (n_right / len(y)) * H_y_right

    def _entropy(self, y):
        _, counts = np.unique(y, return_counts=True)
        p = counts / len(y)
        return -np.sum(p * np.log2(p + 1e-10))

    def _traverse_tree(self, x, node):
        if isinstance(node, int):
            return node
        feature = node['feature']
        split_point = node['split_point']
        if x[feature] <= split_point:
            return self._traverse_tree(x, node['left'])
        else:
            return self._traverse_tree(x, node['right'])


In [None]:
model = DecisionTree(max_depth=3, min_samples_split=2)

# Train the classifier on the training data
model.fit(X_train, y_train)

# Predict the class labels of the test data
y_pred = model.predict(X_test)

# Evaluate the performance of the classifier
accuracy = accuracy_score(y_test, y_pred)