In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd

geyser = pd.read_csv('/content/drive/MyDrive/data/geyser.csv')
chip = pd.read_csv('/content/drive/MyDrive/data/chips.csv')

# Preprocess

In [None]:
def sign(df):
  ds['class'].loc[ds['class'] == 'N'] = -1
  ds['class'].loc[ds['class'] == 'P'] = 1
def to_x_y(ds):
  vals = ds.values
  vals = vals[vals[:, 1].argsort()]
  vals = vals[vals[:, 0].argsort()]
  
  ds = pd.DataFrame(vals)
  return ds.drop(2, axis=1).values, ds[2].values.tolist()


In [None]:
sign(chip)
chip.head()

Unnamed: 0,x,y,class
0,0.051267,0.69956,1
1,-0.092742,0.68494,1
2,-0.21371,0.69225,1
3,-0.375,0.50219,1
4,-0.51325,0.46564,1


In [None]:
sign(geyser)
geyser.head()

Unnamed: 0,x,y,class
0,1,4.4,-1
1,1,3.9,-1
2,1,4.0,1
3,1,4.0,-1
4,1,3.5,-1


# Criterions

In [None]:
def w_sum(x, y, w):
  return sum([w_dot if x_dot == y else 0 for (x_dot, w_dot) in zip(x, w)])


def get_probs(x, w):
  x = np.array(x)
  frequencies = np.asarray(np.unique(x, return_counts=True)).T  # [[element, n_elements], ...]
  cardinal = sum(w)
  return np.array(list(map(lambda info: w_sum(x, info[0], w) / cardinal, frequencies)))


def gini(x, w):
  p = get_probs(x, w)
  return np.sum(p * (1 - p))


def entropy(x, w):
  p = get_probs(x, w)
  return - np.sum(p * np.log2(p))


def gain(left_y, right_y, left_weights, right_weights, criterion):
  n_left = sum(left_weights)
  n_right = sum(right_weights)
  n_total = n_left + n_right

  return 1 - (n_right / n_total) * criterion(right_y, right_weights) - (n_left / n_total) * criterion(left_y,
                                                                                                      left_weights)

In [None]:
def gain(left_y, right_y, left_weights, right_weights, criterion):
  n_left = sum(left_weights)
  n_right = sum(right_weights)
  n_total = n_left + n_right
    
  return 1 - (n_right / n_total) * criterion(right_y, right_weights) - (n_left / n_total) * criterion(left_y, left_weights)

# Decision Tree

In [None]:
class DecisionTreeLeaf:
  def __init__(self):
    self.y = None
    self.frequencies = None

  def get_probs(self):
    return self.y


class DecisionTreeNode(DecisionTreeLeaf):
  def __init__(self, split_dim, split_value, left, right):
    super().__init__()
    self.split_dim = split_dim
    self.split_value = split_value
    self.left = left
    self.right = right
    self.all = None

In [None]:
def get_most_frequent_class(y, w=None):
  if w is None:
    w = np.ones_like(y)
  res_freq = {y_dot: 0 for y_dot in np.unique(np.array(y), return_counts=False)}
  for (y_dot, w_dot) in zip(y, w):
    res_freq[y_dot] += w_dot
  return max(res_freq, key=res_freq.get), res_freq

In [None]:
class DecisionTreeClassifier:
  def __init__(self, criterion=gini, max_depth=None, min_samples_leaf=1):
    self.root = None
    self.criterion = criterion
    self.max_depth = max_depth if max_depth is not None else float("inf")
    self.node_id = 0
    self.min_samples_leaf = min_samples_leaf

  def _train(self, X, y, weights, cur_depth):
    cur_node = None
    if cur_depth > self.max_depth or len(X) <= self.min_samples_leaf:
      cur_node = DecisionTreeLeaf()
      cur_node.y, cur_node.frequencies = get_most_frequent_class(y, weights)
      return cur_node
    max_q, best_dim, best_val, best_left_S, best_right_S = float("-inf"), None, None, None, None
    unique_values_per_dim = [np.sort(np.unique(row)) for row in np.transpose(X)]

    for dim, unique in enumerate(unique_values_per_dim):
      if len(unique) == 1:
        continue
      for split_val in unique:
        left_S = [(x, y_dot, w) for (x, y_dot, w) in zip(X, y, weights) if x[dim] < split_val]
        right_S = [(x, y_dot, w) for (x, y_dot, w) in zip(X, y, weights) if x[dim] >= split_val]
        if len(left_S) == 0 or len(right_S) == 0:
          continue

        left_X, left_y, left_weights = zip(*left_S)
        right_X, right_y, right_weights = zip(*right_S)

        q = gain(left_y, right_y, left_weights, right_weights, self.criterion)
        if q > max_q:
          max_q = q
          best_dim = dim
          best_val = split_val
          best_left_S = left_S
          best_right_S = right_S

    if best_right_S is None or best_left_S is None:
      cur_node = DecisionTreeLeaf()
      cur_node.y, cur_node.frequencies = get_most_frequent_class(y, weights)
      return cur_node

    best_right_X, best_right_y, best_right_weights = zip(*best_right_S)
    best_left_X, best_left_y, best_left_weights = zip(*best_left_S)

    left = self._train(best_left_X, best_left_y, best_left_weights, cur_depth + 1)
    right = self._train(best_right_X, best_right_y, best_right_weights, cur_depth + 1)

    cur_node = DecisionTreeNode(best_dim, best_val, left, right)
    cur_node.y = get_most_frequent_class(y)
    return cur_node

  def fit(self, X, y, weights=None):
    if weights is None:
      weights = [1. for _ in y]
    self.root = self._train(X, y, weights, 0)

  def predict(self, X):
    return [self._find_leaf(x).get_probs() for x in X]

  def _find_leaf(self, x):
    cur_node = self.root
    while isinstance(cur_node, DecisionTreeNode):
      if x[cur_node.split_dim] < cur_node.split_value:
        cur_node = cur_node.left
      else:
        cur_node = cur_node.right
    return cur_node

In [None]:
get_most_frequent_class([1, -1, 1], [0.01, 0.9, 0.0])

(-1, {-1: 0.9, 1: 0.01})

# Random forest

In [None]:
import random
import numpy as np
def cut_columns(X, columns_to_cut, axis=0):
  if isinstance(X, pd.DataFrame):
    return X.drop(columns_to_cut, axis=axis, inplace=False)
  return np.delete(X, columns_to_cut, axis)


def cut_rows(X, y, weights, ratio=0.1):
  idx = random.sample(list(range(len(X))), int(ratio * len(X)))
  if isinstance(X, pd.DataFrame):
    X = X.drop(idx, axis=0)
    y = y.drop(idx, axis=0)
    w = w.drop(idx, axis=0)
  else:
    X = np.delete(X, idx, 0)
    y = np.delete(y, idx, 0)
    x = np.delete(weights, idx, 0)
  return X, y, idx


class DecisionTreeWrapper:
  def __init__(self, **tree_kwargs):
    self.tree = DecisionTreeClassifier(**tree_kwargs)
    self.removed_columns = []
    self.unseen_samples = []

  def predict(self, X):
    X = cut_columns(X, self.removed_columns, 1)
    if isinstance(X, pd.DataFrame):
      X = X.values
    X = X.tolist()
    return self.tree.predict(X)

  def fit(self, X, y, weights, max_features="auto", drop_columns=True, drop_rows=True):
    if max_features == "auto":
      max_features = float("inf")

    if drop_columns:
      self.removed_columns = random.sample(range(len(X[0])), min(len(X[0]) // 6, max_features))
      X = cut_columns(X, self.removed_columns, 1)
    if drop_rows:
      X, y, self.unseen_samples = cut_rows(X, y, weights)
    if isinstance(X, pd.DataFrame):
      X = X.values
      y = y.values
    X = X.tolist()
    y = y.tolist()
    weights = weights.tolist()
    self.tree.fit(X, y, weights)

In [None]:
import copy
from scipy import stats


class RandomForestClassifier:
  def __init__(self, criterion=gini, max_depth=None, min_samples_leaf=1, max_features="auto", n_estimators=10,
               drop_columns=True, drop_rows=True):
    self.trees = [DecisionTreeWrapper(criterion=criterion, max_depth=max_depth, min_samples_leaf=min_samples_leaf)
                  for _ in range(n_estimators)]
    self.max_features = max_features
    self.drop_columns = drop_columns
    self.drop_rows = drop_rows

  def fit(self, X, y, weights=None):
    if weights is None:
      weights = np.ones_like(y)
    for tree in self.trees:
      tree.fit(copy.copy(X), y, weights, max_features=self.max_features,
               drop_columns=self.drop_columns, drop_rows=self.drop_rows)

  def predict(self, X):
    predictions = [tree.predict(X) for tree in self.trees]
    return stats.mode(predictions, axis=0).mode[0]

In [None]:
from sklearn.metrics import accuracy_score


def strong_classifier(classifier):
  def classify(X):
    return np.sign(np.sum([b * np.array(c.predict(X)) for (b, c) in classifier], axis=0))

  return classify


class StrongClassifierWrapper:
  def __init__(self, clf):
    self.clf = clf

  def predict(self, X):
    return self.clf(X)


def adaboost(data, number_of_steps, tree_kwargs=None):  # D - Data, T - number of steps
  if tree_kwargs is None:
    tree_kwargs = {}
  quality_history, final_classifier = [], []
  X, y = data
  weights = [1 / len(y) for _ in y]
  for t in range(number_of_steps):
    # Train weak classifier
    a = DecisionTreeClassifier(max_depth=3, **tree_kwargs)
    a.fit(X, y, weights)

    # Obtain new classifier's predictions (hypothesis)
    predictions = a.predict(X)

    # Computer weighted error
    n = sum([w * (0 if y_dot == p_dot else 1) for (w, y_dot, p_dot) in zip(weights, y, predictions)])

    # Choose reliability of a weak classifier
    b = 0.5 * np.log((1 - n) / n)

    # Update weights
    error = np.prod([predictions, y], axis=0)
    weights = [w * np.exp(-b * e) for (w, e) in zip(weights, error)]

    # Normalize weights so that the sum to 1
    norm_factor = sum(weights)
    weights = [w / norm_factor for w in weights]

    # Add current weak classifier to the strong classifier
    final_classifier.append((b, a))

    # Compute model quality on the test set
    strong_predictions = strong_classifier(final_classifier)(X)
    quality_history.append(accuracy_score(y, strong_predictions))
  return final_classifier, quality_history

# Chip forest

In [None]:
from sklearn.model_selection import train_test_split

X, y = to_x_y(chip)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
chip_forest = RandomForestClassifier(max_depth=float("inf"), n_estimators=5, min_samples_leaf=1, criterion=entropy)
chip_forest.fit(X_train, y_train)

In [None]:
from mlxtend.plotting import plot_decision_regions

plot_decision_regions(X.astype(np.float), np.array(y), clf=chip_forest)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  This is separate from the ipykernel package so we can avoid doing imports until


ValueError: ignored