In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
#This is a class for normalising/standardising data.
class Modify():
  def __init__(self):
    pass

  def fit(self, X):
    self.min = X.min(axis=0)
    self.max = X.max(axis=0)
    self.mean = X.mean(axis=0)
    self.err = 1e-8
    self.std = X.std(axis=0) + self.err
    self.range = self.max-self.min + self.err

  def norm(self, X):
    try:
      return (X-self.min)/self.range
    except Exception as error_type:
      print("Error: ", error_type)

  def z_score(self, X):
    try:
      return (X-self.mean)/self.std
    except Exception as error_type:
      print("Error: ", error_type)

In [None]:
#Implementation of optimisers.
class optimiser:
  def __init__(self):
    pass

  def update(self):
    raise NotImplementedError

class gradient_descent(optimiser):
  def __init__(self):
    pass

  def update(self, gradient):
    return -gradient

class momentum(optimiser):
  def __init__(self, beta = 0.9):
    self.beta = beta
    self.v = None

  def update(self, gradient):
    if self.v is None:
      self.v = np.zeros_like(gradient)
    self.v = self.beta * self.v + (1-self.beta) * gradient
    return -self.v

class rmsprop(optimiser):
  def __init__(self, beta = 0.9):
    self.beta = beta
    self.s = None

  def update(self, gradient):
    if self.s is None:
      self.s = np.zeros_like(gradient)
    self.s = self.beta * self.s + (1-self.beta) * (gradient*gradient)
    return -gradient/(np.sqrt(self.s)+1e-8)

class adam(optimiser):
  def __init__(self, beta1 = 0.9, beta2 = 0.999):
    self.beta1 = beta1
    self.beta2 = beta2
    self.m = None
    self.v = None
    self.t = 1

  def update(self, gradient):
    if self.m is None:
      self.m = np.zeros_like(gradient)
    if self.v is None:
      self.v = np.zeros_like(gradient)
    self.m = self.beta1 * self.m + (1-self.beta1) * gradient
    self.v = self.beta2 * self.v + (1-self.beta2) * (gradient*gradient)
    m_hat = self.m/(1-(self.beta1**self.t))
    v_hat = self.v/(1-(self.beta2**self.t))
    self.t+= 1
    return -m_hat/(np.sqrt(v_hat)+1e-8)

In [None]:
#Single Variable Linear Regression that uses closed form solution
class SimpleLin_Reg():
  def __init__(self):
    self.w = None
    self.b = None

  def fit(self, X, Y):
    try:
      self.w = ((X*Y).mean()-(X.mean()*Y.mean()))/((X*X).mean()-(X.mean()*X.mean()))
      self.b = Y.mean() - self.w*X.mean()

      return self.w, self.b
    except Exception as error_type:
      print("Error: ", error_type)

  def predict(self, X):
    return round(self.w*X+self.b, 4)

  def plot(self, X, Y):
    plt.scatter(X, Y, color='blue')
    plt.grid(1)
    self.w, self.b = self.fit(X, Y)
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.title("Y= "+str(round(self.w,4))+" X + "+str(round(self.b,4)))
    plt.plot(X, self.predict(X), color='red')


In [None]:
#Multivariable Linear Regression
class multivLin_Reg():
    def __init__(self, lr=0.1, p=1000):
        self.lr = lr
        self.p = p

    def fit(self, X, Y):
        X = np.array(X)
        Y = np.array(Y).reshape(-1,1)
        self.modify = Modify()
        self.modify.fit(X)
        X = self.modify.z_score(X)
        m, n = X.shape
        self.w = np.zeros(n).reshape(n,1)
        self.b = 0
        P = np.linspace(0, self.p, self.p+1)
        L = []

        for u in range(self.p):
            y_pred = (X @ self.w + self.b).reshape(-1, 1)
            loss = np.mean((y_pred - Y)**2)
            L.append(loss)

            if (u>10) and (u%10==0):
              if (abs(L[u-10]-L[u])<0.1):
                print(f"Epoch: {u}")
                break

            dW = (1/m) * (X.T @ (y_pred - Y))
            dB = (1/m) * np.sum(y_pred - Y)

            self.w -= self.lr * dW
            self.b -= self.lr * dB

        plt.plot(P[:u+1], L)
        R2 = 1 - (np.sum((y_pred - Y)**2)/np.sum((Y - np.mean(Y))**2))
        print(f"R2 evaluation: {R2}")

    def predict(self, X):
        return self.modify.z_score(np.array(X)) @ self.w + self.b

In [None]:
#Logistic Regression
class Log_Reg():
    def __init__(self, lr=0.5, p=1000):
        self.lr = lr
        self.p = p

    def fit(self, X, Y):
        self.modify = Modify()
        X = np.array(X)
        self.modify.fit(X)
        X = self.modify.z_score(X)
        Y = np.array(Y).reshape(-1, 1)
        m, n = X.shape
        self.w = np.zeros(n).reshape(n, 1)
        self.b = 0
        P = np.linspace(0, self.p, self.p+1)
        L = []
        for u in range(self.p):
            z = X @ self.w + self.b
            z = z.reshape(-1, 1)
            y = 1 / (1 + np.exp(-z))

            loss = -np.mean(Y * np.log(y + self.modify.err) + (1 - Y) * np.log(1 - y + self.modify.err))
            L.append(loss)
            if (u>10) and (u%10==0):
              if (abs(L[u-10]-L[u])<0.001):
                print(f"Epoch: {u}")
                break

            dJ_dw = (1/m) * (X.T @ (y - Y))
            dJ_db = (1/m) * np.sum(y - Y)

            self.w -= self.lr * dJ_dw
            self.b -= self.lr * dJ_db
        plt.plot(P[:u+1], L)

    def predict(self, X):
        X = self.modify.z_score(np.array(X))
        z = X @ self.w + self.b
        y = 1 / (1 + np.exp(-z))
        return np.where(y > 0.5, 1, 0)

In [None]:
#Polynomial Regression of a single Variable nth degree
class Poly_Reg():
  def __init__(self, degree= 3):
    self.degree = degree

  def fit(self, X, Y):
    X = X.reshape(-1,1)
    m = X.shape[0]
    self.w = np.zeros(self.degree+1).reshape(self.degree+1, 1)
    Z = np.zeros(m*(self.degree+1)).reshape(-1, self.degree+1)
    for i in range(m):
      for j in range(self.degree+1):
        Z[i,j] = np.power(X[i].flatten()[0], j)

    self.w = np.linalg.inv(Z.T @ Z) @ Z.T @ Y

  def predict(self, X):
    X = X.reshape(-1, 1)
    m = X.shape[0]
    Z = np.zeros(m*(self.degree+1)).reshape(-1, self.degree+1)
    for i in range(m):
      for j in range(self.degree+1):
        Z[i, j] = np.power(X[i].flatten()[0], j)

    return (Z @ self.w).reshape(-1, 1)

In [None]:
#Decision Trees
def entropy(Y):
  classes, count = np.unique(Y, return_counts=True)
  p = count / count.sum()
  return -np.sum(p * np.log2(p))

def info_gain(left, right, parent):
  return entropy(parent) - ((len(left)/len(parent)) * entropy(left) + (len(right)/len(parent)) * entropy(right))

def split_dataset(X, Y, feature_index, threshold):
  left_bool = (X[:, feature_index] <= threshold).reshape(-1,)
  right_bool = (X[:, feature_index] > threshold).reshape(-1,)

  X_left = X[left_bool]
  X_right = X[right_bool]

  Y_left = Y[left_bool]
  Y_right = Y[right_bool]

  return X_left, X_right, Y_left, Y_right

def best_split(X, Y):
  max_info_gain = 0
  best_feature = 0
  best_threshold = 0
  for i in range(0, X.shape[1]):
    Threshold_array = np.sort(np.unique(X[:, i])).astype(float)
    for j in range(0, Threshold_array.shape[0]-1):
      Threshold_array[j] = (Threshold_array[j] + Threshold_array[j+1])/2
      X_left, X_right, Y_left, Y_right = split_dataset(X, Y, i, Threshold_array[j])
      ig = info_gain(Y_left, Y_right, Y)
      if (max_info_gain < ig):
        max_info_gain = ig
        best_feature = i
        best_threshold = Threshold_array[j]

  return best_feature, best_threshold, max_info_gain

class Node():
  def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
    self.feature_index = feature_index
    self.threshold = threshold
    self.left = left
    self.right = right
    self.info_gain = info_gain
    self.value = value

def build_tree(X, Y):
  root = Node()
  root.feature_index, root.threshold, root.info_gain = best_split(X, Y)
  A, B = np.unique(Y, return_counts=True)
  if (len(A) == 1):
    root.value = A[0]
    return root
  if (root.info_gain == 0):
    root.value = A[np.argmax(B)]
    return root
  X_left, X_right, Y_left, Y_right = split_dataset(X, Y, root.feature_index, root.threshold)
  if (X_left.size == 0 or X_right.size == 0):
    root.value = A[np.argmax(B)]
    return root
  root.left = build_tree(X_left, Y_left)
  root.right = build_tree(X_right, Y_right)
  return root

def predict(root, X):
  if (root.value != None):
    return root.value
  if (X[root.feature_index] <= root.threshold):
    return predict(root.left, X)
  else:
    return predict(root.right, X)

class DecisionTree():
  def __init__(self):
    self.root = None

  def fit(self, X, Y):
    self.root = build_tree(X, Y)
    return self.root

  def predict(self, X):
    return predict(self.root, X)

In [None]:
#KNN
class KNN():
  def __init__(self, K = 4):
    self.K = K

  def fit(self, X, Y):
    self.X = X
    self.Y = Y
    self.modify = Modify()
    self.modify.fit(X)
    self.A = self.modify.norm(X)


  def predict(self, P):
    B = self.modify.norm(P).reshape(1, -1)
    if (B.shape[1]==self.A.shape[1]):
      C = abs(self.A-B).sum(axis=1)
      print(C.shape)
      idx = np.argsort(C)[:self.K]
      U, V = np.unique(self.Y[idx], return_counts=True)
      return U[np.argmax(V)]
    else:
      B = B.reshape(-1, self.A.shape[1])
      G = []
      for r in range(B.shape[0]):
        C = abs(self.A-B[:, r]).sum(axis=1)
        idx = np.argsort(C)[:self.K]
        U, V = np.unique(self.Y[idx], return_counts=True)
        G.append(U[np.argmax(V)])
      return G

In [None]:
#K Means Classification
class K_Means():
  def __init__(self, clusters=4, iterations=100):
    self.K = clusters
    self.p = iterations

  def distance(self, x, centroids):
    return np.linalg.norm(x-centroids, axis=1)

  def fit(self, X):
    self.X = np.array(X)
    m, n = self.X.shape
    self.modify = Modify()
    self.modify.fit(self.X)
    self.X = self.modify.z_score(self.X)

    centroids = []
    centroids.append(self.X[np.random.choice(m)])
    for _ in range(1, self.K):
      distances = np.array([min(np.linalg.norm(np.array(x) - np.array(c)) ** 2 for c in centroids) for x in self.X])
      probs = distances / np.sum(distances)
      centroids.append(self.X[np.random.choice(m, p=probs)])
    centroids = np.array(centroids)

    for itr in range(self.p):
      cluster_set = [[] for j in range(self.K)]
      for x in self.X:
        dist = self.distance(x, centroids)
        cluster = np.argmin(dist)
        cluster_set[cluster].append(x)

      new_centroids = []
      for i, cluster in enumerate(cluster_set):
        if len(cluster) == 0:
          distances = np.linalg.norm(self.X - centroids[i], axis=1)
          new_centroids.append(self.X[np.random.choice(m)])
        else:
          new_centroids.append(np.mean(cluster, axis=0))

      new_centroids = np.array(new_centroids)

      if np.allclose(centroids, new_centroids, atol=1e-6):
        break
      centroids = new_centroids

    self.centroids = centroids
    if (n==2):
      plt.scatter(self.X[:,0], self.X[:,1], color='blue')
      plt.scatter(centroids[:,0], centroids[:,1], color='red')
      plt.show()

  def predict(self, X):
    X = self.modify.z_score(np.array(X))
    labels = []
    for x in X:
        labels.append(np.argmin(self.distance(x, self.centroids)))
    return np.array(labels)


In [None]:
#Neural Networkss
class Layer:
  def forward(self, X):
    raise NotImplementedError

  def backward(self, d_out):
    raise NotImplementedError

def he(shape):
  return np.random.randn(*shape) * np.sqrt(2/shape[0])

def xavier(shape):
  return np.random.randn(*shape) * np.sqrt(1/shape[0])

class Linear(Layer):
  def __init__(self, in_features, out_features, optimiser, weight_init):
    self.inf = in_features
    self.outf = out_features
    self.W = weight_init((in_features, out_features)).reshape(in_features, out_features)
    self.b = np.zeros(out_features).reshape(-1, out_features)
    self.dW = None
    self.db = None
    self.optimiser_W = optimiser()
    self.optimiser_b = optimiser()

  def forward(self, X):
    self.X = np.array(X).reshape(-1, self.inf)
    return (self.X @ self.W + self.b).reshape(-1, self.outf)

  def backward(self, dZ):
    self.dW = (self.X.T @ dZ).reshape(self.inf, self.outf)
    self.db = np.sum(dZ, axis=0).reshape(self.b.shape)
    return dZ @ self.W.T

  def update(self, lr=0.01):
    self.lr = lr
    self.W += self.lr * self.optimiser_W.update(self.dW)
    self.b += self.lr * self.optimiser_b.update(self.db)


class ReLU(Layer):
  def forward(self, Z):
    self.Z = Z
    return (Z > 0) * Z

  def backward(self, dA):
    return dA * (self.Z > 0)

class Sigmoid(Layer):
  def forward(self, Z):
    self.Z = Z
    self.A = 1 / (1 + np.exp(-Z))
    return self.A

  def backward(self, dA):
    return dA * self.A * (1 - self.A)


class MSELoss:
  def forward(self, y_pred, y_true):
    self.y_pred = y_pred
    self.y_true = y_true
    return np.mean((y_pred - y_true) ** 2)

  def backward(self):
    return 2 * (self.y_pred - self.y_true) / self.y_pred.shape[0]


class Network:
  def __init__(self, layers, lr=0.01):
    self.layers = layers
    self.lr = lr

  def forward(self, X):
    for layer in self.layers:
      X = layer.forward(X)
    return X

  def backward(self, dY):
    for layer in reversed(self.layers):
      dY = layer.backward(dY)

  def update(self):
    for layer in self.layers:
      if hasattr(layer, "update"):
        layer.update(self.lr)