A collection of from-scratch implementations of various basic machine learning algorithms and metrics for evaluating algorithms.

In [1]:
import numpy as np
import random as rnd
# for testing
from sklearn import datasets, model_selection

# Evaluation Metrics #

Implementating various useful metrics for evaluating machine learning models. Should include metrics for classification (binary and multilabel) and regression.

In [2]:
# entry i,j is the true label i labeled j
def confusion_matrix(y_true, y_pred):
    # get class labels in this data set
    unique = list(set(y_true))
    unique.sort()
    num_classes = len(unique)
    
    # create and fill out confusion matrix
    cmatrix = np.zeros((num_classes, num_classes))
    
    for i in range(len(y_true)):
        true = unique.index(y_true[i])
        pred = unique.index(y_pred[i])
        cmatrix[true][pred] += 1
    
    return cmatrix

# returns true positives for each class
# true positive: actual and predicted this class
def get_true_positives(y_true, y_pred, cmatrix=None):
    tp = []
    
    if cmatrix is not None:
        tp = np.diagonal(cmatrix)
        
    else:
        y_true = np.asarray(y_true)
        y_pred = np.asarray(y_pred)
        
        # get list of classes 
        unique = list(set(y_true))
        unique.sort()
        
        for u in unique:
            tmp = np.sum(np.logical_and(y_true == u, y_pred == u))
            tp.append(tmp)
    
    # in binary case, treat first label as the positive
    if len(tp) == 2:
        return tp[0]
                
    return tp

# return true negatives for each class
# true negative: actual and predicted are not of this class
def get_true_negatives(y_true, y_pred, cmatrix=None):
    tn = []
    
    if cmatrix is not None:
        # get false negatives and false positives for each
        # would double count true positives though, so arbitrarily subtract from one
        fn = np.sum(cmatrix, axis=1)
        fp = np.sum(cmatrix, axis=0) - np.diagonal(cmatrix)
        
        # get the total in the entire matrix
        total = np.sum(fn)
        
        tn = total - (fn + fp)
            
    else:
        unique = list(set(y_true))
        unique.sort()
        
        y_true = np.asarray(y_true)
        y_pred = np.asarray(y_pred)
        
        for u in unique:
            tmp = np.sum(np.logical_and(y_true != u, y_pred != u))
            tn.append(tmp)
    
    # in binary case, treat second label as negative
    if len(tn) == 2:
        return tn[1]
    
    return tn

# returns false positives for each class
# false positive: predicted this class, not actually this class
def get_false_positives(y_true, y_pred, cmatrix=None):
    fp = []
    
    if cmatrix is not None:
        fp = np.sum(cmatrix, axis=0) - np.diagonal(cmatrix)
        
    else:
        unique = list(set(y_true))
        unique.sort()
        
        y_true = np.asarray(y_true)
        y_pred = np.asarray(y_pred)
                
        for u in unique:
            tmp = np.sum(np.logical_and(y_true != u, y_pred == u))
            fp.append(tmp)
    
    # in binary case, treat first label as positive
    if len(fp) == 2:
        return fp[0]
    
    return fp

# returns false negatives for each class
# false negative: actual this class, not predicted this class
def get_false_negatives(y_true, y_pred, cmatrix=None):
    fn = []
    
    if cmatrix is not None:
        fn = np.sum(cmatrix, axis=1) - np.diagonal(cmatrix)
    
    else:
        unique = list(set(y_true))
        unique.sort()
        
        y_true = np.asarray(y_true)
        y_pred = np.asarray(y_pred)
        
        for u in unique:
            tmp = np.sum(np.logical_and(y_true == u, y_pred != u))
            fn.append(tmp)
    
    # in binary case, treat second label as negative
    if len(fn) == 2:
        return fn[1]
    
    return fn

# TP / all
def accuracy(y_true, y_pred, cmatrix=None):
    tps = np.asarray(get_true_positives(y_true, y_pred, cmatrix))
    total = len(y_true)
    
    return np.sum(tps)/total
    
# returns for each class
# TP / (TP + FN)
def recall(y_true, y_pred, cmatrix=None):
    tps = np.asarray(get_true_positives(y_true, y_pred, cmatrix))
    fns = np.asarray(get_false_negatives(y_true, y_pred, cmatrix))
    
    recall = tps / (tps+fns)
    
    return recall

# returns for each class
# TP / (TP + FP)
def precision(y_true, y_pred, cmatrix=None):
    tps = np.asarray(get_true_positives(y_true, y_pred, cmatrix))
    fps = np.asarray(get_false_positives(y_true, y_pred, cmatrix))
    
    prec = tps / (tps + fps)
    
    return prec

# returns for each class
# 2 * (precision * recall / precision + recall)
def f1_score(y_true, y_pred, cmatrix=None):
    prec = precision(y_true, y_pred, cmatrix)
    rec = recall(y_true, y_pred, cmatrix)
    
    f1 = 2*((prec*rec)/(prec+rec))
    
    return f1

# calculate root mean squared error for regression problems
def calc_rms(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    
    tmp = np.square(y_pred - y_true)
    tmp = np.sum(tmp)/len(y_pred)
    
    return tmp**(1/2)


# Sampling Methods #

Implement from scratch various sampling methods. Generally oversampling, undersampling, and then getting a training and testing subset.

In [3]:
# sample with replacement from minority classes and add to data set so all classes equally represented
def oversample(X, Y):
    labels = list(set(Y))
    label_counts = []
    
    # determine which classes are the majority class
    max_label_val = 0
    max_labels = []
    
    for l in labels:
        tmp = np.sum(Y == l)
        label_counts.append(tmp)
        
        if tmp > max_label_val:
            max_labels = [l]
            max_label_val = tmp
        elif tmp == max_label_val:
            max_labels.append(l)
    
    # oversample from the minority classes
    for l in range(len(labels)):
        if labels[l] not in max_labels:
            sample_num = max_label_val - label_counts[l] 
            
            # get the indices from the actual data set that have this label
            tmp_data = np.where(Y == labels[l])[0]
            to_add = np.random.choice(tmp_data, sample_num)
            
            for ta in to_add:
                tmp = np.reshape(X[ta], (1, len(X[ta])))
                X = np.append(X, tmp, axis=0)
                Y = np.append(Y, Y[ta])
                
    return X,Y

# remove data points from the majority classes so that all classes equally represented
def undersample(X, Y):
    labels = list(set(Y))
    label_counts = []
    
    # determine which classes are the minority class
    min_labels_val = len(X) + 1
    min_labels = []
    
    for l in labels:
        tmp = np.sum(Y == l)
        label_counts.append(tmp)
        
        if tmp < min_labels_val:
            min_labels = [l]
            min_label_val = tmp
        elif tmp == min_labels_val:
            min_labels.append(l)
    
    # remove samples from majority classes
    for l in range(len(labels)):
        if labels[l] not in min_labels:
            remove_num = label_counts[l] - min_label_val
            
            # get indices from actual data set with this label
            tmp_data = np.where(Y == l)[0]
            to_remove = np.random.choice(tmp_data, remove_num, replace=False)
            
            for tr in to_remove:
                X = np.delete(X, tr, axis=0)
                Y = np.delete(Y, tr)
    
    return X,Y

# train_per is what percent of data set should be the training data
def split_train_test(X, Y, train_per=0.5):
    num_train = int(train_per*len(X))
    indices = list(np.arange(len(Y)))
        
    train_ind = rnd.sample(indices, num_train)
    X_train = np.take(X, train_ind, axis=0)
    Y_train = np.take(Y, train_ind)
    
    X_test = np.delete(X, train_ind, axis=0)
    Y_test = np.delete(Y, train_ind)
    
    return X_train,X_test,Y_train,Y_test
    

# k-Nearest Neighbors Implementation #

Implementing from scratch the k-nearest neighbors algorithm. Which classifies a given data point as the majority vote label of the k-training data points nearest to the test data point.

In [4]:
class kNN:
    
    # weights: {uniform, distance};
    # euclidean: p = 2; manhattan: p = 1
    def __init__(self, k, weights='uniform', p=2):
        self.k = k
        self.weights = weights
        #self.distance = distance
        self.p = p
        
        self.data = None
        self.labels = None
    
    # training is simply taking in the data set that will be used
    def train(self, X, Y):
        self.data = np.asarray(X)
        self.labels = Y
    
    # testing returns the labels for a list of testing data points
    def test(self, X):
        predicted_labels = []
        
        for x in X:
            dists = np.power(np.subtract(self.data, x), self.p)
            dists = np.power(np.sum(dists, axis=1), 1/self.p)
        
            # order by distances
            sorted_dists = np.argsort(dists)
            
            # plurality vote by k-nearest training data points
            labels = [self.labels[sorted_dists[i]] for i in range(self.k)]
            if self.weights == 'uniform':
                pred = max(set(labels), key=labels.count)
            else:
                weights = {}
                for i in range(len(labels)):
                    if labels[i] not in weights:
                        weights[labels[i]] = 0
                    weights[labels[i]] += 1/(i+1)
                
                pred = max(weights, key=weights.get)
            
            predicted_labels.append(pred)
        
        return predicted_labels

#### Test implementation ####

Test the from scratch implementation by comparing the accuracy of the method versus the scikit-learn implementation on the built-in Iris data set.

In [6]:
# iris data set
data = datasets.load_iris()
X = data['data']
Y = data['target']

# separate into training and testing set
X_train,X_test,Y_train,Y_test = model_selection.train_test_split(X, Y, train_size=0.7, random_state=123)

model = kNN(5)
model.train(X_train, Y_train)
Y_preds = model.test(X_test)

cmatrix = confusion_matrix(Y_test, Y_preds)
print('My confusion matrix:')
print(cmatrix)
print()

from sklearn import neighbors

sk_model = neighbors.KNeighborsClassifier()
sk_model.fit(X_train, Y_train)
preds = sk_model.predict(X_test)

sk_cmatrix = confusion_matrix(Y_test, preds)
print('Sklearn confusion matrix:')
print(sk_cmatrix)
print()

My confusion matrix:
[[18.  0.  0.]
 [ 0.  9.  1.]
 [ 0.  0. 17.]]

Sklearn confusion matrix:
[[18.  0.  0.]
 [ 0.  9.  1.]
 [ 0.  0. 17.]]



# Neural Network #

Implementation of some of the aspects of a neural network including activation functions and the feed forward section. This was created following the tutorial: https://towardsdatascience.com/coding-neural-network-forward-propagation-and-backpropagtion-ccf8cf369f76

In [7]:
# initialize the network with random parameters between 0 and 1

def initialize_parameters(layer_dims):
    params = {}
    
    for l in range(1, len(layer_dims)):
        # weights should be in the shape: layers this dimension x layers last dimension
        # pull from standard normal distribution
        params['W' + str(l)] = np.random.randn(layer_dims[l], layer_dims[l-1])
        params['b' + str(l)] = np.zeros((layer_dims[l], 1))
        
    return params

#### Activation Functions ####

Implementation of sigmoid and ReLU acitvation functions that introduce non-linearity to the layers.

In [8]:
# activation functions

def sigmoid(Z):
    A = 1/(1 + np.exp(-Z))
    
    return A,Z

def relu(Z):
    A = np.maximum(0, Z)
    
    return A,Z

#### Feed forward pass ####

Implementation of the forward pass of the NN, which takes inputs from a previous layer and calculates: $z = W^Tx + b$.

After calculating that, it applies an activation function g(z). Each run needs to store the variables computed during the calculation so that they can be called on during the backpropagation part.

It will do this for every layer in the network. After the last layer, it will pass through the final forward with non-linear, in this case a sigmoid, which will provide the final classification.

In [9]:
def forward_helper(X, W, b, activation):
    Z = np.dot(W, X) + b
    cache1 = (X, W, b)
    
    if activation == 'sigmoid':
        Z,cache2 = sigmoid(Z)
    else:
        Z,cache2 = relu(Z)
    
    return Z,(cache1, cache2)

def forward_pass(X, params, hidden_layer_activation='relu'):
    # save info for backprop
    caches = []
    # parameters stores bias+weights, so need to divide length by 2
    num_layers = len(params)//2
    
    for l in range(1, num_layers):
        X_prev = X
        X,cache = forward_helper(X_prev, params['W'+str(l)], params['b'+str(l)], hidden_layer_activation)
        caches.append(cache)
        
    X,cache = forward_helper(X, params['W'+str(l)], params['b'+str(l), 'sigmoid'])
    caches.append(cache)
    
    return X,caches

#### Cross-entropy cost ####

Implementing binary cross-entropy loss for cost, which uses log-likelihood to estimate its error. 

In [10]:
def cost(X, y):
    cost = -(1/y.shape[1]) * np.sum(np.multiply(y, np.log(X)) + np.multiply(1-y, np.log(1-X)))
    
    return cost

#### Back-propagation ####

Backprop allows the information to flow backwards in the network so weights can be updated based on performance.

In [11]:
def sigmoid_gradient(dX, Z):
    A,Z = sigmoid(Z)
    dZ = dX*A*(1-A)
    
    return dZ

def relu_gradient(dX, Z):
    A,Z = relu(Z)
    dZ = np.multiply(dX, np.int64(A>0))
    
    return dZ

def backward_helper(dX, cache, activation):
    linear_cache, activation_cache = cache
    
    if activation == 'sigmoid':
        dZ = sigmoid_gradient(dX, activation_cache)
    else:
        dZ = relu_gradient(dX, activation_cache)
    
    # backward for forward pass
    A_prev,W,b = linear_cache
    m = A_prev.shape[1]
    
    dW = (1/m)*np.dot(dZ, A_prev.T)
    db = (1/m)*np.sum(dZ, axis=1, keepdims=True)
    dA_prev = np.dot(W.T, dZ)
    
    return dA_prev, dW, db

def backward_pass(X, y, caches, hidden_layer_activation='relu'):
    y = y.reshape(X.shape)
    L = len(caches)
    grads = {}
    
    # have to go backward through the network
    dX = np.divide(X-y, np.multiply(X, 1-X))
    grads['dX' + str(L-1)],grads['dW'+str(L)],grads['db'+str(L)] = backward_helper(dX, caches[L-1], 'sigmoid')
    
    for l in range(L-1, 0, -1):
        curr_cache = caches[l-1]
        grads['dX'+str(l-1)],grads['dW'+str(l)],grads['db'+str(l)] = backward_helper(grads['dX'+str(l)], curr_cache, hidden_layers_activation)
    
    return grads

def update_params(params, grads, learning_rate):
    L = len(params)//2
    
    for l in range(1, L+1):
        params['W'+str(l)] = params['W'+str(l)] - learning_rate*grads['dW'+str(l)]
        params['b'+str(l)] = params['b'+str(l)] - learning_rate*grads['db'+str(l)]
    
    return params
                                                                     

The above will all be put together to create a network. The network is trained by over many iterations, calling: forward pass, cost, backward pass, update parameters.