Justin Wang December 2020

This script will perform logistic regression (forward and backprop using vectorization).

First, we establish the correct project (and associated info) and import the related datasets for training.

In [72]:
#!/usr/bin/python

import numpy as np
import h5py
import scipy
import pandas as pd
from PIL import Image
from scipy import ndimage
import os
import PIL
from matplotlib import pyplot
import random
import math

#project = "RashData"
#positive = "Lyme_Positive"
#negative = "Lyme_Negative"

#project = "chest_xray"
#positive = "NORMAL"
#negative = "PNEUMONIA"

#project = "chest_covid"
#positive = "NORMAL"
#negative = ["COVID19","PNEUMONIA"]
#need a code for splitting the negative dataset

project = "cat_dog"
positive = "cats"
negative = "dogs"

#import datasets    
f = h5py.File(project+'.hdf5', "r")

train_pos_dset = f['train/'+positive]
train_neg_dset = f['train/'+negative]

val_pos_dset = f['val/'+positive]
val_neg_dset = f['val/'+negative]

#test_pos_dset = f['test/'+positive]
#test_neg_dset = f['test/'+negative]



Next, we establish some basic information about the dataset.

In [73]:
def training_info(train_pos_dset, train_neg_dset):
    #number of training examples for each class
    num_train_neg = train_pos_dset.shape[0]
    num_train_pos = train_neg_dset.shape[0]

    num_features = train_pos_dset.shape[1]

    #list of indices for accessing images from the datasets
    train_neg_index_list = list(range(num_train_neg))
    train_pos_index_list = list(range(num_train_pos))
    
    return num_train_neg, num_train_pos, num_features, train_neg_index_list, train_pos_index_list

Here is the meat of the code, in which we iterate through epochs and iterate through mini-batches within each epoch. Forward and back propagation (a step of gradient descent) is completed once for each mini-batch. Minibatches are shuffled and newly generated for each epoch.

In [None]:
def initialize(layers, num_features):
    #layers is a list describing the number of nodes in each layer, including the output layer but not the features (i.e layers=[5,2,3,1] has 5 nodes in layer 1, 2 nodes in layer 2.... 1 node in the output layer)
    
    W = []
    B = []    
    #W and B are lists containing weights and biases matrices - the i'th item in W is the i'th set of weights (i.e the second item in W is the matrix of weights between the first and second hidden layers)
    
    x = 0
    
    #TODO: check if these initializations are appropriate
    while x < len(layers):
        if x == 0:
            W.append(np.random.rand(layers[x],num_features))
        else:
            W.append(np.random.rand(layers[x],layers[x-1]))
            
        B.append(np.zeroes(layers[x],1))
        x+=1
            
    parameters = {"W":W,
                  "B":B}
    return parameters

In [None]:
def mini_batch(train_neg_index_list,train_pos_index_list,mini_batch_size)
    #list of indices for a given mini-batch (used to access the images for this mini-batch from the dataset)
    pos_mini_batches = []
    neg_mini_batches = []

        
    num_instances_batched=0

    #iterate through and add all the mini batches for this epoch to the collector lists pos_mini_batches & neg_mini_batches
    #(only making sure we get all the positive instances.. we don't need to iterate through ALL negative examples if theres a mismatch between # pos and # neg)
    while num_instances_batched < num_train_pos:


        #slice locations on shuffled index list (indicates which shuffled indices are associated with this current mini-batch)
        #this slice should cover half a batch's worth of indices since we're splitting the classes 50-50
        slice_start_neg = int(num_instances_batched%num_train_neg)
        slice_end_neg = int((num_instances_batched+mini_batch_size/2)%num_train_neg)


        #NOTE: we grab from the first few indices (wraparound) if we have to loop around/duplicate instances to complete batches... this is same as grabbing randoms to fill in because the list is randomized
        #if we don't have a wrap-around
        if slice_end_neg > slice_start_neg:
            neg_mini_batches.append(train_neg_index_list[slice_start_neg:slice_end_neg]) 
        #if we have a wrap-around
        else:
            neg_mini_batches.append(train_neg_index_list[slice_start_neg:] + train_neg_index_list[:slice_end_neg]) 



        #repeat for positive class
        slice_start_pos = int(num_instances_batched%num_train_pos)
        slice_end_pos = int((num_instances_batched+mini_batch_size/2)%num_train_pos)


        if slice_end_pos > slice_start_pos:
            pos_mini_batches.append(train_pos_index_list[slice_start_pos:slice_end_pos])
        else:
            pos_mini_batches.append(train_pos_index_list[slice_start_pos:] + train_pos_index_list[:slice_end_pos]) 



        #iterate to next mini-batch
        num_instances_batched+=mini_batch_size/2
        
        
            
    return neg_mini_batches, pos_mini_batches

In [None]:
def feature_label(neg_indices, pos_indices, train_neg_dset, train_pos_dset, mini_batch_size, num_features):
    #matrices containing our features and their labels for training set mini-batches
            
    
    #(n[0],m) ... will transpose upon return
    features = np.empty([mini_batch_size,num_features])    

    #(1,m) ... will transpose upon return
    labels = np.empty([mini_batch_size,1])                       
    
    counter = 0

    #fancy indexing? super slow - 10-20x slower because sorting of indices is needed
    #extract the dataset images pointed to by the current minibatch, combining both negative and positive classes
    for index in neg_indices:
        features[counter] = train_neg_dset[index]

        labels[counter] = 0
        counter+=1


    for index in pos_indices:
        features[counter] = train_pos_dset[index]

        labels[counter] = 1
        counter+=1  
            
    if counter not == mini_batch_size:
        print("error: batch size - mini batch indices mismatch")
    
    return features.transpose(), labels.transpose()

In [None]:

def leaky_relu(Z):
    return np.where(Z > 0, Z, 0.01*Z)
    

def leaky_relu_derivative(Z):
    return np.where(Z > 0, 1.0, 0.05)

    
    
def sigmoid(Z):
    
    return 1 / (1 + np.exp(-Z))


def sigmoid_derivative(Z):
    
    return 1 / (1 + np.exp(-Z)) * (1 - (1 / (1 + np.exp(-Z))))


In [None]:
def forward_prop(parameters, features, layers):
    
    Z = []
    A = []
    G = []
    for layer in range(len(layers)):
        
        A_last_layer = np.zeroes([1,1])
        
        if layer==0:
            A_last_layer = features
        else:
            A_last_layer = A[layer-1]
        
        
        W_current_layer = parameters['W'][layer]
        B_current_layer = parameters['B'][layer]
        
        
        Z_current_layer = np.dot(W_current_layer, A_last_layer) + B_current_layer
        
        
        #if output layer
        if layer == len(layers)-1:
            G_current_layer = "sigmoid"
            A_current_layer = sigmoid(Z_current_layer)
        else:
            G_current_layer = "leaky_relu"
            A_current_layer = leaky_relu(Z_current_layer)
        
        
        Z.append(Z_current_layer)
        A.append(A_current_layer)
        G.append(G_current_layer)
            
    return Z, A, G
    
#batchnorm in future

In [None]:
def back_prop(parameters, features, labels, layers, Z, A, G, mini_batch_size):
    dZ = []
    dW= []
    db = []
    for layer in reversed(range(len(layers))):
        
        
        A_last_layer = np.zeroes([1,1])
        
        
        if layer == 0:
            A_last_layer = features
        else:
            A_last_layer = A[layer-1]
        
        
        if layer == len(layers)-1:
            dZ_current_layer = A[layer] - labels
            dW_current_layer = (1/mini_batch_size)*np.dot(dZ_current_layer, A[layer-1].transpose())
            db_current_layer = (1/mini_batch_size)*np.sum(dZ_current_layer, axis=1, keepdims=True)
        
        else:
            #TODO implement variable for G that can change depending on layer
            #for now, we know all the hidden layers use leaky relu so we'll directly use that derivative
            dZ_current_layer = np.dot(parameters[W][layer+1].transpose(), dZ[len(layers)-1-layer-1]) * leaky_relu_derivative(Z[layer])
            dW_current_layer = (1/mini_batch_size)*np.dot(dZ_current_layer, A_last_layer.transpose())
            db_current_layer = (1/mini_batch_size)*np.sum(dZ_current_layer, axis=1, keepdims=True)
        
        #these lists are going to be backwards (layer X ... layer 2, layer 1)
        dZ.append(dZ_current_layer)
        dW.append(dW_current_layer)
        db.append(db_current_layer)
    
    dW.reverse()
    db.reverse()
    updates = {'dW':dW,
                'db':db}
    return updates

#TODO: variable g(Z) setting/matrix   
#may just use leaky relu

#batchnorm in future

In [None]:
def compute_cost(A, labels, mini_batch_size):
     
    cost = -1/mini_batch_size*np.sum(np.multiply(np.log(A[-1]),labels)+np.multiply(np.log(1-A[-1]),1-labels))
               
    return cost

In [None]:
def gradient_descent(parameters, features, labels, num_features, learning_rate, layers, mini_batch_size):
        
    Z, A, G = forward_prop(parameters, features, layers)
    cost = compute_cost(A,labels, mini_batch_size)
    updates = back_prop(paramaters, features, labels, layers, Z, A, G, mini_batch_size)
    
    #update the parameters 
    for layer in range(len(parameter[W])):
        parameter[W][layer] = parameter[W][layer] - learning_rate * updates[dW][layer]
        parameter[B][layer] = parameter[B][layer] - learning_rate * updates[db][layer]
    
    #code here if we want to track or print cost after every mini-batch
    
    return parameters, cost

In [None]:
        
def model(train_pos_dset, train_neg_dset, mini_batch_size =32, layers = [1], epochs = 10000, print_cost=False, learning_rate=0.005):
    
    #lets us track cost and later graph it
    cost_tracker = np.zeroes([1000])
    cost = 0
    
    num_train_neg, num_train_pos, num_features, train_neg_index_list, train_pos_index_list = training_info(train_pos_dset, train_neg_dset)
    
    parameters = intialize(layers, num_features)
    
    #iterate through all the epochs
    while epoch>0:

        #shuffle the lists of indices, effectively shuffling the training instances among the mini-batches 
        random.shuffle(train_neg_index_list)
        random.shuffle(train_pos_index_list)
        
        #neg_mini_batches and pos_mini_batches are lists of lists of indices pointing to training instances
        neg_mini_batches, pos_mini_batches = mini_batch(train_neg_index_list,train_pos_index_list,mini_batch_size)
        
        #iterate through all the mini-batches (e.g. neg_indices contains the indices of a single mini-batch of negative class training instances)
        for neg_indices, pos_indices in zip(neg_mini_batches, pos_mini_batches):
            
            #extract feature list and label list from this mini-batch
            features, labels = feature_label(neg_indices, pos_indices,train_neg_dset, train_pos_dset, mini_batch_size, num_features)
            parameters, cost = gradient_descent(parameters, features, labels, num_features, learning_rate, layers, mini_batch_size)
        
            #maybe also write code for printing cost after each mini batch
             #maybe write code for % success at test
                
        
        #TODO write code for printing cost after each epoch
        #maybe write code for % success at test
        
        #TODO track cost
        cost_tracker[10000-epoch] = cost
        
        #iterate to next epoch
        epoch-=1
    
    #TODO print final cost or costs AND plot graph of costs x iterations
    
    print("final cost: "+cost)
    plt.plot(cost_tracker)
    plt.show()
    
    return parameters
    #TODO write code for % success at test, cost using prediction()
    

In [None]:


def prediction(pos_dset, neg_dset, parameters, layers):
    predictions_on_pos = np.empty([pos_dset.shape[0],1])
    predictions_on_neg = np.empty([pos_dset.shape[1],1])
    
    for pos in range(pos_dset.shape[0]):
        predictions_on_pos[pos] = forward_prop(parameters, pos_dset[pos].transpose(), layers)
    
    for neg in range(neg_dset.shape[0]):
        predictions_on_neg[neg] = forward_prop(parameters, neg_dset[neg].transpose(), layers)
    
    
    return predictions_on_pos, predictions_on_neg
    
    
    
def accuracy(predictions, labels, show_wrong=3):
    #TODO show accuracy and images that are incorrectly classified - default 3 from each class

In [None]:


parameters = model(train_pos_dset, train_neg_dset, mini_batch_size =32, layers = [2,5,5,1], epochs = 10000, print_cost=False, learning_rate=0.005)
predictions_on_pos, predictions_on_neg = prediction(train_pos_dset, train_neg_dset, parameters, layers = [2,5,5,1])
predictions_on_pos, predictions_on_neg = prediction(val_pos_dset, val_neg_dset, parameters, layers = [2,5,5,1])
predictions_on_pos, predictions_on_neg = prediction(test_pos_dset, test_neg_dset, parameters, layers = [2,5,5,1])
accuracy()
f.close()