In [1]:
import numpy as np
import sklearn.metrics

In [2]:
def get_metrics(y_true, y_hat):
    acc = sklearn.metrics.accuracy_score(y_true, y_hat)
    balanced_acc = sklearn.metrics.balanced_accuracy_score(y_true, y_hat)
    return acc, balanced_acc  

In [3]:
class DecisionStump:
    def __init__(self, granularity=10):
        self.best_feature = None
        self.best_direction = None
        self.best_position = None
        self.granularity = granularity

    def train(self, X, y, dist_t):
        """
        arguments:
        X: n✕d training inputs
        y: n✕1 labels with values 1 or -1
        dist_t: n✕1 distribution of samples
        ------------------------------------------
        output:
        e_t: 0/1 loss of h_t which has to be less than 0.5
        """

        X = np.asarray(X)
        y = np.asarray(y)
        dist_t = np.asarray(dist_t)

        # maximum value for error is 1
        e_t = 1
        for feature in range(X.shape[1]):
            err, direction, position = self.__find_best_separator(
                X[:, feature], y, dist_t
            )
            if err < e_t:
                e_t = err
                self.best_feature = feature
                self.best_direction = direction
                self.best_position = position

        return e_t

    # for a given feature, finds the best separator
    def __find_best_separator(self, x, y, dist_t):
        """
        x: n✕1 one feature of inputs
        y: n✕1 labels with values 1 or -1
        dist_t: n✕1 distribution of samples
        ------------------------------------------
        output:
        e_t: 0/1 loss of h_t which has to be less than 0.5
        best_dir: defines which side of the separator gets +1 label
        best_pos: defines the value of the feature used to separate the data
        """

        best_dir = 1
        best_pos = x.min()

        # maximum value for error is 1
        e_t = 1
        for p in np.arange(x.min(), x.max(), (x.max() - x.min()) / self.granularity):
            for d in (-1, 1):
                y_hat = np.ones((y.size)).reshape(-1, 1)
                if d == 1:
                    y_hat[x < p] = -1
                else:
                    y_hat[x > p] = -1

                err = np.sum((y_hat != y) * dist_t)
                if err < e_t:
                    e_t = err
                    best_dir = d
                    best_pos = p

        return e_t, best_dir, best_pos

    def predict(self, X_test):
        """
        X_test: n✕d test inputs
        ------------------------------------------
        output:
        y_hat: n✕1 predicted labels with values 1 or -1
        """

        X_test = np.asarray(X_test)
        if X_test.ndim == 1:
            X_test = X_test[np.newaxis, :]

        y_hat = np.ones((X_test.shape[0]))
        if self.best_direction == 1:
            y_hat[X_test[:, self.best_feature] < self.best_position] = -1
        else:
            y_hat[X_test[:, self.best_feature] > self.best_position] = -1

        return y_hat

In [4]:
train_data = np.genfromtxt("datasets/Syndata-train.csv", delimiter=",")[1:]
X_train = train_data[:, :2]
y_train = train_data[:, 2:]

test_data = np.genfromtxt("datasets/Syndata-test.csv", delimiter=",")[1:]
X_test = test_data[:, :2]
y_test = test_data[:, 2:]

In [5]:
# # sample distribution in the algorithm is initialized by 1/N
# sample_dist = np.ones((X_train.shape[0])).reshape(-1,1) / X_train.shape[0]


# dt = DecisionStump()
# train_err = dt.train(X_train, y_train, sample_dist)
# print(f'train error = {train_err}')

# y_test_hat = dt.predict(X_test)
# acc, _ = get_metrics(y_test, y_test_hat)
# print(f'test acc = {acc}')

In [6]:
class AdaBoost:
    def __init__(self, num_iter):
        self.num_iter = num_iter
        self.h = None
        self.alpha = None

    def __get_new_dist(self, wl, X, y, alpha_t, old_dist):
        new_dist = []
        for i in range(X.shape[0]):
            numerator = old_dist[i] * np.exp(-alpha_t * y[i] * wl.predict(X[i]))
            denominator = 0
            for j in range(X.shape[0]):
                denominator += old_dist[j] * np.exp(-alpha_t * y[j] * wl.predict(X[j]))

            new_dist.append(numerator / denominator)

        return np.asarray(new_dist).reshape(-1, 1)

    def train(self, X, y):
        X = np.asarray(X)
        y = np.asarray(y)

        # sample distribution is initialized by 1/N
        sample_dist = np.ones((X.shape[0])).reshape(-1, 1) / X.shape[0]
        alpha = [None for _ in range(self.num_iter)]
        wl_list = [None for _ in range(self.num_iter)]
        for t in range(self.num_iter):
            print(f'------------------ t = {t} ------------------')
            wl = DecisionStump()
            e = wl.train(X, y, sample_dist)
            alpha[t] = (1 / 2) * np.log((1 / e) - 1)
            sample_dist = self.__get_new_dist(wl, X, y, alpha[t], sample_dist)

            wl_list[t] = wl

        self.h = wl_list
        self.alpha = alpha
    
    def predict(self, X_test):
        X_test = np.asarray(X_test)
        y_hat = 0
        for t in range(self.num_iter):
            y_hat += self.alpha[t] * self.h[t].predict(X_test)

        return np.sign(y_hat)


In [7]:
adaboost = AdaBoost(num_iter=10)
adaboost.train(X_train, y_train)


------------------ t = 0 ------------------
------------------ t = 1 ------------------
------------------ t = 2 ------------------
------------------ t = 3 ------------------
------------------ t = 4 ------------------
------------------ t = 5 ------------------
------------------ t = 6 ------------------
------------------ t = 7 ------------------
------------------ t = 8 ------------------
------------------ t = 9 ------------------


In [8]:
y_test_hat = adaboost.predict(X_test)
acc, _ = get_metrics(y_test, y_test_hat)
print(f'test acc = {acc}')

test acc = 0.84
