In [1]:
import zipfile as zf

zip_files = zf.ZipFile("ENGR_518_group_2_datasets.zip", 'r')
zip_files.extractall()
zip_files.close()

!pip install pillow
!pip install numpy
!pip install matplotlib
!pip install numba



In [2]:
"""
Group Number: 2
Team members: Matthew Tucsok, Aliakbar Davoodi, Don Milinda Thilan Sangapalaarachchi, Sydney Agbonkhese
Project Title: UAV Landing: Flat Ground Detection
"""

import glob
import os
import random
import time

from PIL import Image, ImageFilter
import numpy as np
import matplotlib.pyplot as plt
from numba import jit


class DataLoader:
    """
    This class manages how images are loaded in, preprocessed, and assembled into an input format compatible with our
    2D classifier model. Note that due to how we create our training and test sets, both sets get preprocessed.
    """

    def __init__(self, path):
        """
        Initializing the data loader to find the dataset images.
        :param path: Path to the root of the program. The root should contain the executing script, and a folder
        named "Greyscale Dataset" in it which contains the 4000 images in the dataset. Note that the above jupyter
        cell should auto-unzip and extract the dataset to the correct location.
        """
        self.source = path
        self.samples = []  # List to hold the dataset samples
        self.calculated_features = []  # List to hold the calculated features that will be used as input to the model
        self.load_data()  # Begin data loading method

    def load_data(self):
        files = glob.glob(self.source + '/*.png')  # This will put all the .png filenames into a list

        # index = 0
        for file in files:  # Iterate over all files in the dataset
            img = Image.open(file).convert('L')  # Open the greyscale images, specifying 'L' to ensure correct loading

            """
            The following 3 line were for visualization purposes only in the report. They select a specific image to
            show how the original greyscale image looks before pre-processing.
            """
            # grey = Image.open(file).convert('L')
            # if index % 3334 == 0 and index > 0:
            #     img.show()

            """
            # img_h is a histogram with 256 bins with each bin n counting how many times a pixel with a value of n
            appears in the image.
            """
            img_h = np.array(img.histogram())
            var = np.var(img_h)  # Calculating the variance of the histogram. This is one of our final features

            """
            img.filter(ImageFilter.FIND_EDGES) generates an edge-detected image with the same dimensions as the original
            greyscale image.
            """
            img = img.filter(ImageFilter.FIND_EDGES)

            # The following 2 lines were for showing the edge-detected image for the report
            # if index % 3334 == 0 and index > 0:
            #     img.show()

            img_array = np.array(img)  # Ensuring PIL image is now a numpy array

            # Calculating the mean of the edge-detected image
            mean = np.sum(img_array) / (img_array.shape[0] * img_array.shape[1])

            binary_img = np.where(img_array > mean, 1, 0)  # Thresholding edge-detected image based on mean pixel value
            edge_pixel_sum = np.sum(
                binary_img)  # Summing all of the pixel values in the binary image. 2nd final feature

            # Following 4 lines used to display the binary image for visualization in the report.
            # if index % 3334 == 0 and index > 0:
            #     print(index)
            #     pil_binary_img = Image.fromarray(binary_img*255)
            #     pil_binary_img.show()
            file = file.replace('\\','/')  # Replacing backslashes with forward slashes for consistency on different OS
            splits = file.split('/')  # Parsing the file name for label extraction. Works on browser jupyter notebook

            class_and_index = splits[-1]  # Getting the last split which contain the name of the image with filetype
            label_name, _ = class_and_index.split('_', 2)  # Getting only the 'f' or 'nf' part of the image name
            label = None
            if label_name == 'f':
                label = 1  # Flat images are class 1
            elif label_name == 'nf':
                label = 0  # Non-flat images are class 0
            else:
                raise SyntaxError('Invalid class label detected')  # Just a check to see if data was loaded correctly
            self.calculated_features.append(
                np.array([var, edge_pixel_sum]))  # Appending the features to the feature list
            # self.samples.append([img, label])  # (Deprecated)
            self.samples.append([binary_img, label])
            # self.samples.append([grey, label])
            # index += 1

        """
        The following block of code is where the final feature vector is generated as input to the model. This section
        contains commented code and visibly redundant lines to keep above code consistent for all iterations of input
        while still maintaining functionality. For example, in the final feature vector we do not include image pixels,
        so vectorization of the image is no longer required and therefore replaced with an empty array to remove these
        features
        """
        index = 0
        for sample in self.samples:  # Iterating over all samples in the dataset
            img_array = np.array(sample[0])  # (Deprecated) Extracting the previous image associated with a sample
            img_vector = img_array.ravel()  # (Deprecated) vectorizing the 64 x 64 image to a 4096 x 1 vector
            img_vector = np.array(
                [])  # Removing the image vector as it was not used in the final iteration of the model

            # (Deprecated) Used to remove calculated features to test only the performance of vectorized image inputs
            # self.calculated_features[index] = np.array([])

            # The line below will keep whatever features still exist after the decisions made above
            sample_vector = np.hstack((img_vector, self.calculated_features[index]))
            self.samples[index][0] = sample_vector
            index += 1


@jit(nopython=True, parallel=True)
def sigmoid(t):
    """
    The sigmoid activation function. This predicts the label of a sample point x. The jit decorator is for performance
    purposes and allows for simple parallel CPU computation

    :param t: x_bar dot multiplied with the weights vector
    :return: prediction of the class for sample x
    """
    return 1 / (1 + np.exp(-t))


@jit(nopython=True, parallel=True)
def regularized_cross_entropy_cost(y, weights, preds, lam, log_eps=1e-9):
    """
    This function calculated the regularized cross entropy cost.

    :param y:
    :param weights: (deprecated field to maintain code clarity)
    :param preds: label prediction of x generated by sigmoid
    :param lam: regularization parameter
    :param log_eps: small value to avoid negative log bugs
    :return: regularized cross entropy cost
    """
    return -1 / (len(y)) * np.sum(y * np.log(preds + log_eps) + (1 - y) * np.log(
        1 - preds + log_eps))


def find_gradient_numerically(weights, x_bar, y, eps, lam):
    """
    (Deprecated) This function uses the definition of the derivative to find the gradient with respect to each weight. It first
    calculated the cost with the original weights vector, then sequential increments one of the weights by epsilon,
    recalculates the cost, finds the derivative, then decrements that same weight by epsilon. This step is repeated for
    each weight in the weights vector to find the entire gradient vector.

    :param weights: weights vector for the model
    :param x_bar: x feature vector front-padded with a bias feature of 1
    :param y: label of sample x
    :param eps: small step for numerically calculating the derivative using the definition of a derivative
    :param lam: cross entropy regularization parameter
    :return:
    """
    gradients = []  # List to hold each element of the gradient
    preds = sigmoid(np.dot(x_bar, weights))  # Calculating class prediction for x
    fx = regularized_cross_entropy_cost(y, weights, preds,
                                        lam)  # The cost function value before taking a step. This is constant for calculating all gradients in a given weights vector
    temp_weights = weights.copy()  # Creating a copy of weights vector to reduce calculations
    for weight in range(len(weights)):  # Loop to iterate over each weight
        temp_weights[weight] += eps  # Adding the small epsilon value to one of the weights
        preds = sigmoid(np.dot(x_bar, temp_weights))  # Predicting class for the updated cost
        fx_plus_eps = regularized_cross_entropy_cost(y, temp_weights, preds,
                                                     lam)  # Finding the new cost after small step
        gradients.append((fx_plus_eps - fx) / eps)  # Definition of a derivative (rise/run)
        temp_weights[weight] -= eps  # Decrementing the weight by epsilon
    return np.array(gradients)


@jit(nopython=True, parallel=True)
def find_fast_gradient_numerically(weights, x_bar, y, eps, lam):
    """
    (Deprecated) See "find_gradient_numerically" above for detailed description of numerical method for finding the
    gradient. This function is identical in operation, with the exception that any function calls within the for
    loop have been refactored so the code from those functions is directly in the loop. This is a requirement for jit,
    a just-in-time compilation tool for significantly increasing speed by compiling to machine code after the first
    iteration of the loop. This function is roughly 10x faster than the non-jit numerical gradient function above.
    """
    log_eps = 1e-9
    gradients = []
    preds = 1 / (1 + np.exp(-(np.dot(x_bar, weights))))
    fx = regularized_cross_entropy_cost(y, weights, preds, lam)
    temp_weights = weights.copy()
    for weight in range(weights.shape[0]):
        temp_weights[weight] += eps
        preds = 1 / (1 + np.exp(-(np.dot(x_bar, temp_weights))))
        fx_plus_eps = -1 / (y.shape[0]) * np.sum(
            y * np.log(preds + log_eps) + (1 - y) * np.log(1 - preds + log_eps) + lam * np.linalg.norm(weights[1:]))
        gradients.append((fx_plus_eps - fx) / eps)
        temp_weights[weight] -= eps
    return np.array(gradients)


def find_closed_form_gradient(weights, x_bar, y, eps, lam):
    """
    Closed form for finding the gradient of the cross entropy cost function. This is the fastest gradient method by far
    for calculating the gradient as it is one step. For comparison, the "find_fast_gradient_numerically" takes 6 hours
    to run 4300 iterations of training, while this method only takes roughly a minute for the same 4300 iterations.

    :param weights: weights vectors
    :param x_bar: x feature vector front-padded with a 1 bias
    :param y: label for the given sample x
    :param eps: (deprecated)
    :param lam: (Deprecated) At this point in development we found that the regularization parameter was not helping our
    model learn, so it is not required. We keep it as a field to reduce complexity of which parameters we are passing
    :return: Gradient of cross-entropy cost function
    """
    gradients = (-1 / y.shape[0]) * np.matmul(x_bar.T, y - sigmoid(np.matmul(x_bar, weights)))
    return np.array(gradients)


class Model:
    """
    Our 2D perceptron classifier (i.e. single neuron classifier). Within this class is the initialization of the model,
    the gradient descent algorithm, and accuracy calculation via a from-scratch confusion matrix.
    """

    def __init__(self, samples, learning_rate=0.001, epsilon=1e-7, batch_size=32):
        """
        Initialization of the model with training samples, then perform gradient descent.

        :param samples: A tuple of training samples with contains the feature vector (min 2 features due to a bug) and
        the labels of the samples
        :param learning_rate: alpha, or step size for gradient descent. Best value empirically determined to be 0.001
        :param epsilon: The small step for calculating the gradient of the cost function numerically. Best value
        empirically determined to be 1e-7
        :param batch_size: mini-batch size. Best value empirically determined to be 32
        """

        self.x, self.y = zip(*samples)  # Splitting the feature vectors and the labels of the samples

        """
        self.x: An m x n matrix where m is the number of samples in the training set and n is the number of features
        in the sample.
        """
        self.x = np.array(self.x)

        self.y = np.array(self.y)  # An m x 1 vector where m is the number of samples in the training set.
        num_samples = len(self.y)  # Number of samples in the dataset
        self.x_bar = np.hstack((np.ones([num_samples, 1]), self.x))  # Front-padding ones column to x matrix
        self.weights = np.zeros([self.x_bar.shape[1]])  # Initializing weights vector to 0 with a bias weight as well

        """
         After many experiments we decided to omit the lambda regularization parameter for the regularized cross-entropy
         cost function as it was reducing test accuracy. Thus we set it to 0.
        """
        self.lam = 0

        self.eps = epsilon
        self.alpha = learning_rate

        self.iterations = 100000  # Choosing large maximum of iterations since the code runs quite fast. Never hits this.
        self.stop_criterion = 1e-5  # Found to be a good stopping criterion for test accuracy and speed
        self.iteration_display = 100  # How often to display how training is performing. Too small will lag code
        self.batch_size = batch_size

        """
        The following lines handle the creation of mini-batches. Note that if the batch size does not divide evenly into
        the size of the dataset, up to "batch_size" samples will be rejected from training due to the remainder after
        division.
        """
        self.num_batches = int(num_samples / self.batch_size)  # Finding how many batches there will be in training
        total_rows = self.num_batches * self.batch_size  # Total number of samples to keep for mini-batch training
        temp_x_bar = self.x_bar[:total_rows, :].copy()  # Keeping the calculated number of samples
        temp_y = self.y[:total_rows].copy()  # Keeping the corresponding number of labels
        self.x_bar_batches = np.split(temp_x_bar, self.num_batches,
                                      axis=0)  # Splitting samples evenly into mini-batches
        self.y_batches = np.split(temp_y, self.num_batches, axis=0)  # Splitting labels evenly into mini-batches

        self.gradient_descent()  # Perform gradient descent

    def gradient_descent(self):
        """
        Performing the mini-batch adaptive step size gradient descent using a closed-form gradient of the cross-entropy
        cost function.
        :return: No explicit return value, but the instance of the class will set its weights to those corresponding to the lowest cost
        found during training.
        """

        total_start = time.time()  # Storing start time to determine total execution time of the gradient descent algo
        start = time.time()  # Intermediate start time for display purposes
        history = []  # List for storing the weights and cost histories

        # Loop for iteratively finding the gradients of the cost function to convergence
        for i in range(self.iterations):

            """
            The following two lines are performed to both visualize how training is going after iterating over all
            mini-batches in an complete descent iteration. This cost is appended to the history after each complete
            descent iteration.
            """
            preds = sigmoid(np.dot(self.x_bar, self.weights))  # Using sigmoid to predict sample classes
            cost = regularized_cross_entropy_cost(self.y, self.weights, preds, self.lam)  # Cost of full batch

            # For visualization purposes only. How the training is performing after "self.iteration_display" iterations.
            if i % self.iteration_display == 0:
                end = time.time()
                duration = end - start
                print('Iteration:', i)
                print('Cost:', cost)
                print('Execution time:', duration)
                start = time.time()

            for batch in range(self.num_batches):  # Iterating over each mini-batch
                gradient = find_closed_form_gradient(self.weights, self.x_bar_batches[batch],
                                                     self.y_batches[batch],
                                                     self.eps,
                                                     self.lam)  # Finding the gradient for a given mini-batch
                gradient_norm = np.linalg.norm(gradient)  # Taking the L2-norm of the gradient

                """
                Updating the weights for the model. Note the division by (i + 1) is the adaptive step size, which
                decreases the step size as the training continues.
                """
                self.weights -= self.alpha / (i + 1) * gradient / gradient_norm

            history.append((cost, self.weights, i))  # After running through all mini-batches, save the cost and weights

            if i > 0:  # Making sure at least 2 iterations have occurred before calculating stop criterion

                # Absolute difference check between current and previous iteration to see if stop criteria is met
                if np.abs(history[i][0] - history[i - 1][0]) < self.stop_criterion:
                    break

        print('Total execution for', len(history), 'iterations:', time.time() - total_start)

        # Following lines for plotting purposes. Uncomment to see plot of cost vs iteration, but not recommended.
        # costs, weights, indices = zip(*history)
        # plt.plot(range(len(history)), costs)
        # plt.xlabel('Iteration')
        # plt.ylabel('Cost')
        # plt.title('Cost vs Iterations')
        # plt.show()

        history.sort(key=lambda tup: tup[0])  # Sorting the history by cost to see which cost is lowest
        print('Best weights after iteration', history[0][2], 'with a cost of', history[0][0])
        self.weights = history[0][1]  # Finalizing the internally stored weights of the model
        x = 1

    def accuracy(self, test_samples):
        """
        Method for measuring the model accuracy on a set of samples.
        :param test_samples: A tuple of test feature vectors and label vector
        :return: 2 x 2 confusion matrix, where rows corresponds to the true class of a sample and the columns correspond
        to the predicted class of a sample.
        """
        x_tests, y_tests = zip(*test_samples)  # Splitting the feature vectors from the label vector
        x_bar_tests = np.hstack((np.ones([len(y_tests), 1]), x_tests))  # Front padding feature vectors with a 1
        predictions = sigmoid(np.dot(x_bar_tests, self.weights))  # Predicting the labels of the input feature vectors
        return Model.confusion_matrix(y_tests, predictions)

    @staticmethod
    def confusion_matrix(actuals, predictions):
        """
        Function for generating the 2 x 2 confusion matrix.
        :param actuals: Vector of actual labels for a set of samples.
        :param predictions: Vector of label predictions for a set of samples.
        :return:
        """
        thresh = predictions.round()  # Rounding predicted label, i.e. thresholding by 0.5 to get either 0 or 1
        c11 = np.sum(np.logical_not(np.logical_or(actuals, thresh)))  # Finding true predictions of class 0
        c12 = np.sum(np.logical_and(np.logical_not(actuals), thresh))  # Finding false positives of class 0
        c21 = np.sum(np.logical_and(actuals, np.logical_not(thresh)))  # Finding false positives of class 1
        c22 = np.sum(np.logical_and(actuals, thresh))  # Finding true predictions of class 1
        matrix = np.array([[c11, c12], [c21, c22]])  # Assembling the confusion matrix
        return matrix


def main():
    """
    The main executing function. This function is responsible for finding the path to our dataset, creating an instance
    of our dataset, creating an instance of our model, and running the cross-validation of our model.
    """
    notebook_path = os.path.abspath("ENGR_518_group_2.ipynb")
    dataset_path = os.path.join(os.path.dirname(notebook_path), "Greyscale Dataset")

    """
    # "runs" controls how many times the model is trained and tested. This should be performed multiple times for
    evaluating model accuracy since the training and testing sets are randomly generated with a roughly 50/50 split of
    non-flat and flat images in each set. This ensemble cross-validation method of evaluation uses the law of large
    numbers to find the true general accuracy of the final model.
    """
    runs = 100  # Large enough that average accuracy will have a decent confidence based on variance of model
    accuracies = []  # List to store test accuracies for cross validation
    start = time.time()  # Start time for timing the total execution of the multiple runs of the program
    for run in range(runs):  # Each run is an independent evaluation of performance
        print('Run', run + 1)
        dataset = DataLoader(dataset_path)  # Creating an instance of the dataset
        random.shuffle(dataset.samples)  # Randomizing the order of the dataset to ensure order invariance
        train_test_split = 0.75  # We selected a 75% training 25% testing for the model training and evaluation
        split_index = round(len(dataset.samples) * train_test_split)  # Determining where to split the dataset
        training_set = dataset.samples[:split_index]  # Extracting the training set from the shuffled dataset
        test_set = dataset.samples[split_index:]  # Extracting the testing set from the shuffled dataset

        """
        The following block of code is for visualizing how many images are flat and how many are non-flat for a training
        set. This was to see if the model is biased towards the class with more examples. This did not seem to be too
        much of a problem in testing.
        """
        flat_label_count = 0
        not_flat_label_count = 0
        for sample in training_set:
            if sample[1] == 1:
                flat_label_count += 1
            else:
                not_flat_label_count += 1
        print(flat_label_count, 'flat images and', not_flat_label_count, 'non-flat images in training set.')

        classifier = Model(training_set)  # Creating an instance of the classifier. This initiates model training.

        """
        Code block below checks the number of flat and non-flat images in the test set.
        """
        flat_label_count = 0
        not_flat_label_count = 0
        for sample in test_set:
            if sample[1] == 1:
                flat_label_count += 1
            else:
                not_flat_label_count += 1
        print(flat_label_count, 'flat images and', not_flat_label_count, 'non-flat images in test set.')

        confusion_matrix = classifier.accuracy(training_set)  # Getting the confusion matrix of the training_set
        accuracy = 100 * np.trace(confusion_matrix) / np.sum(confusion_matrix)  # Calculating accuracy of training set
        print(confusion_matrix)
        print('Training Accuracy:', accuracy, '%')

        confusion_matrix = classifier.accuracy(test_set)  # Getting the confusion matrix of the test set
        print(confusion_matrix)
        accuracy = 100 * np.trace(confusion_matrix) / np.sum(confusion_matrix)  # Calculating accuracy of test set
        accuracies.append(accuracy)  # Appending only the test accuracy to the list of accuracy for cross-validation
        print('Testing Accuracy:', accuracy, '%')

    accuracies = np.array(accuracies)  # Converting python list to numpy array for accuracies
    mean_accuracy = np.mean(accuracies)
    var_accuracy = np.var(accuracies)  # Variance of accuracies
    std_accuracy = np.sqrt(var_accuracy)  # Standard deviation of accuracies
    print('Average of Accuracy:', mean_accuracy, '%')
    print('Variance of Accuracy:', var_accuracy)
    print('Standard Deviation of Accuracy:', std_accuracy)
    print('Runtime for', runs, 'runs:', round(time.time() - start, 2), 'seconds')  # Complete runtime for the runs


if __name__ == '__main__':  # Python convention for ensuring recursive calling of main function does not occur
    main()

Run 1
1506 flat images and 1494 non-flat images in training set.
Iteration: 0
Cost: 0.6931471785599431
Execution time: 0.7737665176391602
Total execution for 80 iterations: 0.9589338302612305
Best weights after iteration 79 with a cost of 0.3361330175073247
494 flat images and 506 non-flat images in test set.
[[1321  173]
 [ 277 1229]]
Training Accuracy: 85.0 %
[[451  55]
 [ 82 412]]
Testing Accuracy: 86.3 %
Run 2
1478 flat images and 1522 non-flat images in training set.
Iteration: 0
Cost: 0.6931471785599431
Execution time: 0.0
Total execution for 48 iterations: 0.12411332130432129
Best weights after iteration 47 with a cost of 0.32806378991151675
522 flat images and 478 non-flat images in test set.
[[1348  174]
 [ 257 1221]]
Training Accuracy: 85.63333333333334 %
[[417  61]
 [ 96 426]]
Testing Accuracy: 84.3 %
Run 3
1501 flat images and 1499 non-flat images in training set.
Iteration: 0
Cost: 0.6931471785599431
Execution time: 0.0010004043579101562
Total execution for 17 iterations: 

KeyboardInterrupt: 