In [1]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [2]:
## Methods and Imports

import numpy as np
from math import log
import matplotlib.pyplot as plt
import cv2
import os
import time
import random
from sklearn.utils import shuffle

import tensorflow.compat.v1 as tf
tf.disable_v2_behavior() 
from tensorflow.keras.layers import Flatten

# Read sample as either greyscale or color from path provided
def get_samples(path):
    samples = []
    for file in os.listdir(path):
        samples.append(cv2.imread(path + file, cv2.IMREAD_COLOR))
    return samples

# Increase size of image from 20x20 to 28x28 to match standard LeNet 5 artchitecture
def upscale(data):
    scale_percent = 140 # percent of original size
    image_list = []
    for img in data:
        dst = cv2.resize(img, None, fx = 1.4, fy = 1.4, interpolation = cv2.INTER_CUBIC)
        image_list.append(dst)
    return image_list

# Convert image array to grey scale
def grayscale(data):
    image_list = []
    for img in data:
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        image_list.append(gray)
    image_list = np.array(image_list)
    image_list = np.reshape(image_list, (-1,32,32,1))
    return np.array(image_list)

# Provided validation/testing split and positive/negative samples, create arrays for train/test/valid labels and images
def data_prep(valid_split,testing_split,positive_samples,negative_samples):

    validation_split = valid_split * testing_split

    # Point of split for validation and test, for both positive and negative samples
    pos_split_test = int(len(positive_samples)*testing_split)
    pos_split_valid = int(len(positive_samples)*validation_split)
    neg_split_test = int(len(negative_samples)*testing_split)
    neg_split_valid = int(len(negative_samples)*validation_split)

    # Create the array to hold images in shape of [#samples, dim1, dim2, color channels]
    training_set = positive_samples[0:pos_split_valid] + negative_samples[0:neg_split_valid]
    validation_set = positive_samples[pos_split_valid:pos_split_test] + negative_samples[neg_split_valid:neg_split_test]
    testing_set = positive_samples[pos_split_test:] + negative_samples[neg_split_test:]

    # Add 0's and 1's for image labels 
    training_labels = [1] * len(positive_samples[0:pos_split_valid]) + [0] * len(negative_samples[0:neg_split_valid])
    validation_labels = [1] * len(positive_samples[pos_split_valid:pos_split_test]) + [0] * len(negative_samples[neg_split_valid:neg_split_test])
    testing_labels = [1] * len(positive_samples[pos_split_test:]) + [0] * len(negative_samples[neg_split_test:])

    # Pad images with 0s
    training_set      = np.pad(training_set, ((0,0),(2,2),(2,2),(0,0)), 'constant')
    validation_set    = np.pad(validation_set, ((0,0),(2,2),(2,2),(0,0)), 'constant')
    testing_set       = np.pad(testing_set, ((0,0),(2,2),(2,2),(0,0)), 'constant')
    return training_set,training_labels,validation_set,validation_labels,testing_set,testing_labels


# build using tensorflow for LeNet 5 network
def LeNet(x, activ_func):
    mu = 0
    sigma = 0.1
    
    # convolutional layer 1. input = 32x32x1. output = 28x28x6
    conv1_W = tf.Variable(tf.truncated_normal(shape=(5,5,1,6), mean=mu, stddev=sigma))
    conv1_b = tf.Variable(tf.zeros(6))
    conv1 = tf.nn.conv2d(x, conv1_W, strides=[1,1,1,1], padding='VALID') + conv1_b
    
    # activation with relu
    conv1 = activ_func(conv1)
    
    # max pooling. input = 28x28x6. output = 14x14x6
    conv1 = tf.nn.max_pool(conv1, ksize=[1,2,2,1], strides=[1,2,2,1], padding='VALID')
    
    # convolutional layer 2. input = 14x14x6. output = 10x10x16
    conv2_W = tf.Variable(tf.truncated_normal(shape=(5,5,6,16), mean=mu, stddev=sigma))
    conv2_b = tf.Variable(tf.zeros(16))
    conv2 = tf.nn.conv2d(conv1, conv2_W, strides=[1,1,1,1], padding='VALID') + conv2_b
    
    # activation with relu
    conv2 = activ_func(conv2)
    
    # max pooling. input = 10x10x16. output = 5x5x16
    conv2 = tf.nn.max_pool(conv2, ksize=[1,2,2,1], strides=[1,2,2,1], padding='VALID')
    
    # flatten. input = 10x10x6. output = 400
    fc0   = Flatten()(conv2)
    
    # layer 3: fully connected layer. input = 400. output = 120
    fc1_W = tf.Variable(tf.truncated_normal(shape=(400,120), mean=mu, stddev=sigma))
    fc1_b = tf.Variable(tf.zeros(120))
    fc1 = tf.matmul(fc0, fc1_W) + fc1_b
    
    # activation with relu
    fc1 = activ_func(fc1)
    
    # drop out to prevent overfitting
    fc1 = tf.nn.dropout(fc1, keep_probability)
    
    # layer 4: fully connected layer. input = 120. output = 84
    fc2_W = tf.Variable(tf.truncated_normal(shape=(120,84), mean=mu, stddev=sigma))
    fc2_b = tf.Variable(tf.zeros(84))
    fc2 = tf.matmul(fc1, fc2_W) + fc2_b
    
    # activation with relu
    fc2 = activ_func(fc2)
    
    fc2 = tf.nn.dropout(fc2, keep_probability)

    # layer 5: fully connected layer. input = 84. output = 2
    fc3_W = tf.Variable(tf.truncated_normal(shape=(84,2), mean=mu, stddev=sigma))
    fc3_b = tf.Variable(tf.zeros(2))
    logits = tf.matmul(fc2, fc3_W) + fc3_b
    
    return logits

def evaluate(X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x, y: batch_y, keep_probability: 1.0})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

Instructions for updating:
non-resource variables are not supported in the long term


In [0]:
# Generate samples from Dataset
positive_samples = get_samples("/content/drive/My Drive/school/Machine Learning/ECE 763/positive_samples/")
negative_samples = get_samples("/content/drive/My Drive/school/Machine Learning/ECE 763/negative_samples/")
print('Original Image Dimensions : ',positive_samples[0].shape)

# Upscale dimensions
positive_samples = upscale(positive_samples)
negative_samples = upscale(negative_samples)
print('Rezied Image Dimensions : ',positive_samples[0].shape)
print()

print("Total samples before Data Augmentation: ")
print("Positives: " + str(len(positive_samples)))
print("Negatives: " + str(len(negative_samples)))

# Data Augmentation - rotate negative images
negative_samples_90 = [np.rot90(el) for el in negative_samples]
negative_samples_180 = [np.rot90(el, 2) for el in negative_samples]
negative_samples_270 = [np.rot90(el, 3) for el in negative_samples]
negative_samples = negative_samples + negative_samples_90 + negative_samples_180 + negative_samples_270

# Data Augmentation - horizontally flip positive images
positive_samples_flip = [np.flip(el, 1) for el in positive_samples]
positive_samples = positive_samples + positive_samples_flip

print("Total samples after Data Augmentation: ")
print("Positives: " + str(len(positive_samples)))
print("Negatives: " + str(len(negative_samples)))

In [0]:
# Generate arrays of labels and images for split data 
X_train,y_train,X_valid,y_valid,X_test,y_test = data_prep(0.80, 0.90,positive_samples,negative_samples)

# Print info on split image arrays
n_classes = np.unique(y_train).size
print("Image Shape after padding: {}".format(X_train[0].shape))
print("Number of Classes: {}".format(n_classes))
print("Training Set:   {} samples".format(len(X_train)))
print("Validation Set: {} samples".format(len(X_valid)))
print("Test Set:       {} samples".format(len(X_test)))

In [0]:
# Distribution of labels
plt.hist(y_train, bins=n_classes, label='Train')
plt.hist(y_valid, bins=n_classes, label='Validation')
plt.hist(y_test, bins=n_classes, label='Test')
plt.tick_params(axis='x')
plt.tick_params(axis='y')
plt.ylabel("Number of Samples")
plt.xlabel("Negative / Positive Class")
plt.legend()
plt.title("Class Sample Split")
plt.show()


In [0]:
# Print an example of image

index = random.randint(0, len(X_train))
image = X_train[index]

print('Image label:' + str(y_train[index]))
plt.imshow(image)

In [0]:
# Printing Data Augmentation


for test_var in [positive_samples,positive_samples_flip,negative_samples,negative_samples_90,negative_samples_180,negative_samples_270]:
  image = test_var[0]
  plt.figure()
  plt.figure(figsize=(3,3))
  plt.imshow(image)

In [0]:
# grascale data - CAN'T RERUN
X_train = grayscale(X_train).astype('float')
X_valid = grayscale(X_valid).astype('float')
X_test = grayscale(X_test).astype('float')

index = random.randint(0, len(X_train))
image = X_train[index].squeeze()

plt.figure(figsize=(1,1))
plt.imshow(image, cmap="gray")
print('Image Label (greyscale): ' + str(y_train[index]))

In [0]:
# normalize data
X_train = (X_train - 128)/128 
X_valid = (X_valid - 128)/128 
X_test = (X_test - 128)/128 

# shuffle data
X_train, y_train = shuffle(X_train, y_train)
X_valid, y_valid = shuffle(X_valid, y_valid)
X_test, y_test = shuffle(X_test, y_test)

In [0]:
## Training and Evaluation Pipeline

best_valacc = np.zeros((100,), dtype=int)
best_testacc = np.zeros((100,), dtype=int)
# Best: [rate_var, dr_var, batch_var, activ_var]
best_model = [0,0,0,0]

for rate_var in [.001, .002, .003]:
  for dr_var in [.1, .2, .3]:
    for batch_var in [64, 128, 256]:
      for activ_var in [tf.nn.relu,tf.nn.relu6,tf.nn.selu]:

        # Save accuracy metrics for plotting and evaluation
        save_valacc = []
        save_testacc = []

        # Hyperparameter testing

        #[.001, .002, .003]
        rate = rate_var
        #[.1, .2, .3]
        dropout_rate = dr_var
        #[64, 128, 256]
        BATCH_SIZE = batch_var
        #[tf.nn.relu,tf.nn.relu6,tf.nn.selu]
        activ_func = activ_var


        # Regression Analysis
        #['LASSO','RIDGE','NONE']
        regression_type = 'RIDGE'

        # Set high to measure overfitting
        EPOCHS = 100

        # Features and Labels for evaluation
        x = tf.placeholder(tf.float32, (None, 32, 32, 1))
        y = tf.placeholder(tf.int32, (None))
        keep_probability = tf.placeholder(tf.float32)
        one_hot_y = tf.one_hot(y, 2)

        # Create variables for linear regression
        A = tf.Variable(tf.random_normal(shape=[1,1]))
        b = tf.Variable(tf.random_normal(shape=[1,1]))

        # Initialize model and entropy function
        logits = LeNet(x, activ_func)
        cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits, labels=one_hot_y)

        # Select Regression Type
        if regression_type == 'LASSO':
            # Declare Lasso loss function
            # Lasso Loss = L2_Loss + heavyside_step,
            # Where heavyside_step ~ 0 if A < constant, otherwise ~ 99
            lasso_param = tf.constant(0.9)
            heavyside_step = tf.truediv(1., tf.add(1., tf.exp(tf.multiply(-50., tf.subtract(A, lasso_param)))))
            regularization_param = tf.multiply(heavyside_step, 99.)
            loss_operation = tf.add(tf.reduce_mean(cross_entropy), regularization_param)

        elif regression_type == 'RIDGE':
            # Declare the Ridge loss function
            # Ridge loss = L2_loss + L2 norm of slope
            ridge_param = tf.constant(1.)
            ridge_loss = tf.reduce_mean(tf.square(A))
            loss_operation = tf.expand_dims(tf.add(tf.reduce_mean(cross_entropy), tf.multiply(ridge_param, ridge_loss)), 0)
            
        else:
            loss_operation = tf.reduce_mean(cross_entropy)

        optimizer = tf.train.AdamOptimizer(learning_rate = rate)
        training_operation = optimizer.minimize(loss_operation)

        correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
        accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        saver = tf.train.Saver()

        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            num_examples = X_train.shape[0]
            print("Training for:")
            print("Learning rate:" + str(rate_var))
            print("Dropout rate: " + str(dr_var))
            print("Batch size: " + str(batch_var))
            print("Activation function: " + str(activ_var))
            print()
            for i in range(EPOCHS):
                X_train, y_train = shuffle(X_train, y_train)
                for offset in range(0, num_examples, BATCH_SIZE):
                    end = offset + BATCH_SIZE
                    batch_x, batch_y = X_train[offset:end], y_train[offset:end]
                    sess.run(training_operation, feed_dict={x: batch_x, y: batch_y, keep_probability: 1-dropout_rate})

                validation_accuracy = evaluate(X_valid, y_valid)
                test_accuracy = evaluate(X_test, y_test)

                save_valacc.append(validation_accuracy)
                save_testacc.append(test_accuracy)

            saver.save(sess, './lenet')
            print("Test average for last 50 epochs: " + str(np.mean(save_testacc[50:100])))
            print("Model saved")
            print()

        with tf.Session() as sess:
            saver.restore(sess, tf.train.latest_checkpoint('.'))

            test_accuracy = evaluate(X_test, y_test)
            print("Final Test Accuracy = {:.3f}".format(test_accuracy))

        # Print graph for every run
        t = np.arange(0, len(save_testacc), 1)
        line1 = plt.plot(t, save_valacc, label='Validation Accuracy')
        line2 = plt.plot(t, save_testacc, label='Testing Accuracy')
        plt.tick_params(axis='x')
        plt.tick_params(axis='y')
        plt.legend(loc='lower right')
        plt.ylabel("Accuracy")
        plt.xlabel("Epochs")
        plt.title("Evaluation of Model")
        plt.show()

        # Save best model for plotting 
        if np.mean(save_testacc[50:100]) > np.mean(best_testacc[50:100]):
          best_model = [rate_var, dr_var, batch_var, activ_var]
          best_testacc = save_testacc
          best_valacc = save_valacc



In [0]:
# Print graph for every run
t = np.arange(0, len(best_testacc), 1)
line1 = plt.plot(t, best_valacc, label='Validation Accuracy')
line2 = plt.plot(t, best_testacc, label='Testing Accuracy')
plt.tick_params(axis='x')
plt.tick_params(axis='y')
plt.legend(loc='lower right')
plt.ylabel("Accuracy")
plt.xlabel("Epochs")
plt.title("Evaluation of Best Model")
plt.show()

print('Best model:')
print('learning rate, dropout rate, batch size, activation function')
print(best_model)
print("Test average for last 50 epochs: " + str(np.mean(best_testacc[50:100])))