In [10]:
# necessary libraries, functions, and constants
import csv
import itertools
import math
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import numpy as np
import pandas as pd
import random
import tensorflow as tf
from PIL import Image
from scipy.stats import norm
from sklearn import discriminant_analysis
from sklearn import linear_model
from sklearn import metrics
from sklearn import model_selection
from sklearn import neighbors
from timeit import default_timer as timer

classes = ['ocean', 'ship', 'shore', 'sky']
binary_classes = ['non-obstacle', 'obstacle']

# options
oversample = False


def prepare_data(input_file, description, oversample=False):
    """
    prepares the data, also prints some information about it
    """
    y = []
    X = []
    y_numeric = []
    y_binary = []
    y_binary_numeric = []
    
    print("\n##############\n%s Data summary:" % (description))
    with open(input_file, 'r') as csvfile:
        data_reader = csv.reader(csvfile, dialect='excel')
        for row in data_reader:
            if len(row) > 0:
                y.append(row[0])
                X_float = [ float(x) for x in row[1:] ]
                X.append(X_float)
                if row[0] in ('ship', 'shore'):
                    y_binary.append('obstacle')
                else:
                    y_binary.append('non-obstacle')

    # create a y_numeric for use with tensorflow
    for obs in y:
        y_numeric.append(classes.index(obs))
        
    
    # create a y_binary_numeric for binary classification in TF
    binary_classes = ['non-obstacle', 'obstacle']
    for obs in y_binary:
        y_binary_numeric.append(binary_classes.index(obs))

    assert len(X) == len(y) == len(y_numeric) == len(y_binary) == len(y_binary_numeric)

    # convert to np.array objects
    y = np.array(y)
    y_numeric = np.array(y_numeric)
    X = np.array(X)
    y_binary = np.array(y_binary)
    y_binary_numeric = np.array(y_binary_numeric)

    # how many features?
    num_features = len(X[0])
    print("%s features" % (num_features))

    # count the classes
    largest_class = "none"
    largest_class_count = -1

    unique, counts = np.unique(y, return_counts=True)
    class_counts = dict(zip(unique, counts))

    for key in class_counts.keys():
        count = class_counts[key]
        if count > largest_class_count:
            largest_class_count = count
            largest_class = key

    # raw data stats            
    print("Raw observations:")
    print("%s observations" % (len(y)))
    print("Class counts:")
    for cl in class_counts.keys():
        print("%s - %s" % (cl, class_counts[cl]))

    # if desired, use oversampling for any class that has less than 75% of the observations
    # of the largest class
    if oversample == True:
        print("\nOversampling enabled")
        print("Largest class is " + largest_class + " with %s observations" % (largest_class_count))

        for cl in class_counts.keys():
            if class_counts[cl] < 0.8 * largest_class_count:
                # oversample
                X, y = oversample(X, y, cl, largest_class_count)

        class_counts = {}
        for cl in classes:
            class_counts[cl] = y.count(cl)

        print("\nObservations after oversampling:")
        for cl in class_counts.keys():
            print("%s - %s" % (cl, class_counts[cl]))
    
    return X, y, y_numeric, y_binary, y_binary_numeric


# this function taken from:
# http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
def plot_confusion_matrix(cm, classes,
                          normalize=True,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    thresh = cm.max() * 0.75
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, "{0:.4f}".format(cm[i, j]),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True class')
    plt.xlabel('Predicted class')

    
def oversample(X, y, cl, largest_class):
    """
    returns new X and y lists with oversampling
    """
    X_new = list(X)
    y_new = list(y)
    
    # first count each class in y
    class_count = y.count(cl)
    obs_add = largest_class - class_count
    print("oversample - %s samples with %s class in y, adding %s observations" % (class_count, cl, obs_add))
    
    # keep adding samples for the class
    obs_added = 0
    class_index = 0
    all_in_class = [i for i, x in enumerate(y) if x == cl ] # => [1, 4, 6], all indexes for the class
    assert len(all_in_class) > 0
    
    # take observations from the class sequentially, looping around when off the end
    while obs_added < obs_add:
        index = all_in_class[class_index]
        
        y_new.append(y[index])
        X_new.append(X[index])
        
        obs_added += 1
        
        class_index += 1
        if class_index >= len(all_in_class):
            class_index = 0
    
    return X_new, y_new


def next_batch(X, y, offset, step):
    """
    returns a batch of observations and new offset, given offset and step
    if the batch will run off the end, loops back around
    """
    X_batch = []
    y_batch = []
    
    assert len(X) == len(y)
    
    if offset + step >= len(X):
        new_offset = offset + step - len(X)
        X_batch = list(X[offset:])
        X_batch.extend(list(X[:new_offset]))
        y_batch = list(y[offset:])
        y_batch.extend(list(y[:new_offset]))
    else:
        new_offset = offset + step
        X_batch = X[offset:offset + step]
        y_batch = y[offset:offset + step]
    
    return X_batch, y_batch, new_offset


def one_hot(y, classes):
    """
    takes as input a list of response values as strings, returns
    a one-hot matrix given the class ordering provided, and the map
    to return the index to classes
    """
    class_map = {}
    for i, cl in enumerate(classes):
        class_map[i] = cl
        
    one_hot_matrix = []
    for response in y:
        row = [0] * len(classes)
        row[classes.index(response)] = 1
        one_hot_matrix.append(row)
    
    one_hot_matrix = np.array(one_hot_matrix)
    return one_hot_matrix, class_map
    

In [5]:
# import data from data.csv
y_rgb = []
X_rgb = []
y_rgb_numeric = []
y_rgb_binary = []
r_rgb_binary_numeric = []

y_gray = []
X_gray = []
y_gray_numeric = []
y_gray_binary = []
y_gray_binary_numeric = []

X_rgb, y_rgb, y_rgb_numeric, y_rgb_binary, y_rgb_binary_numeric = prepare_data("data_rgb.csv", "RGB 32x32")
X_gray, y_gray, y_gray_numeric, y_gray_binary, y_gray_binary_numeric = prepare_data("data_gray.csv", "Grayscale 28x28")

feature_sample = {}
# some information about the features
# get 100 samples from each of the classes
for cl in classes:
    sample_red = []
    sample_green = []
    sample_blue = []
    sample_gray = []
    
    i = 0
    while len(sample_red) < 400:
        cla = y_rgb[i]
        if cla == cl:
            # add the average of RGB values for each observation
            sample = X_rgb[i][0::3]
            sample_red.append(sum(sample)/len(sample))
            sample = X_rgb[i][1::3]
            sample_green.append(sum(sample)/len(sample))
            sample = X_rgb[i][2::3]
            sample_blue.append(sum(sample)/len(sample))
            sample = X_gray[i]
            sample_gray.append(sum(sample)/len(sample))
        i += 1
    
    # add samples to the feature_sample
    feature_sample[cl] = [sample_red, sample_green, sample_blue, sample_gray]

print("len of feature_sample = %d, length of sample = %d, length of color sample = %d" % (len(feature_sample), 
                                                                                          len(feature_sample['ship']),
                                                                                          len(feature_sample['ship'][0])))
#print(feature_sample)



##############
RGB 32x32 Data summary:
3072 features
Raw observations:
4140 observations
Class counts:
shore - 793
sky - 496
ocean - 504
ship - 2347

##############
Grayscale 28x28 Data summary:
784 features
Raw observations:
4140 observations
Class counts:
shore - 793
sky - 496
ocean - 504
ship - 2347
len of feature_sample = 4, length of sample = 4, length of color sample = 400


In [93]:
def color_histogram(samples, axis, class_names, super_class_name=''):
    # plot some histograms for colors
    num_bins = 100
    
    
    if len(class_names) == 1:
        class_name = class_names[0]
        # red
        n, bins, patches = plt.hist(samples[class_name][0], num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(samples[class_name][0])
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins, y_plot, 'r-', linewidth=2)

        # green
        n, bins, patches = plt.hist(samples[class_name][1], num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(samples[class_name][1])
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins, y_plot,'g-', linewidth=2)

        # blue
        n, bins, patches = plt.hist(samples[class_name][2], num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(samples[class_name][2])
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins, y_plot,'b-', linewidth=2)

        # gray
        n, bins, patches = plt.hist(samples[class_name][3], num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(samples[class_name][3])
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins, y_plot,'gray', linewidth=2)

        # set axis limits
        #axes = axis.gca()
        axis.set_xlim([0,1])
        axis.set_ylim([0,5])
    
        axis.set_title("%s" % (class_name) )
    else:
        # red
        red_data = samples[class_names[0]][0]
        red_data.extend(samples[class_names[1]][0])
        n, bins, patches = plt.hist(red_data, num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(red_data)
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins,y_plot,'r-',linewidth=2)

        # green
        green_data = samples[class_names[0]][1]
        green_data.extend(samples[class_names[1]][1])
        n, bins, patches = plt.hist(green_data, num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(green_data)
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins,y_plot,'g-',linewidth=2)

        # blue
        green_data = samples[class_names[0]][2]
        green_data.extend(samples[class_names[1]][2])
        n, bins, patches = plt.hist(green_data, num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(green_data)
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins,y_plot,'b-',linewidth=2)

        # gray
        gray_data = samples[class_names[0]][3]
        gray_data.extend(samples[class_names[1]][3])
        n, bins, patches = plt.hist(gray_data, num_bins, color='white', histtype='step')
        (mu,sigma) = norm.fit(gray_data)
        y_plot = mlab.normpdf(bins, mu, sigma)
        axis.plot(bins,y_plot,'gray',linewidth=2)
        
        # set axis limits
        #axes = axis.gca()
        axis.set_xlim([0,1])
        axis.set_ylim([0,5])
        
        axis.set_title("%s" % (super_class_name))

    
# color histograms for sub-classes
f, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, sharex='col', sharey='row')
color_histogram(feature_sample, ax1, ['ocean'])
color_histogram(feature_sample, ax2, ['ship'])
color_histogram(feature_sample, ax3, ['shore'])
color_histogram(feature_sample, ax4, ['sky'])
plt.suptitle("Color norms by class - four classes")
plt.xlabel("value")
plt.ylabel("frequency")
plt.show()

# color histograms for classes
f, (ax1, ax2) = plt.subplots(2, sharex=True)
color_histogram(feature_sample, ax1, ['ocean', 'sky'], 'non-obstacle')
color_histogram(feature_sample, ax2, ['ship', 'shore'], 'obstacle')
plt.suptitle("Color norms by class - two classes")
plt.xlabel("value")
plt.ylabel("frequency")
plt.show()

In [7]:
# create training and test sets
# first method - split data set into two random sets
# randomly select from the original data into training and test sets

# validation sets
# cross-validate the validation set sizes
test_set_sizes = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

def generate_validation_sets(X, y, test_size):
    """
    generates training and test sets using validation set split, given the test_size variable,
    which is a float between 0 and 1
    """
    print("generating train/test split with test size = %s" % (test_size))
    X_train, X_test, y_train, y_test = model_selection.train_test_split(X, 
                                                                        y, 
                                                                        test_size=test_size, 
                                                                        random_state=0)
    return X_train, y_train, X_test, y_test

# K-Nearest Neighbors
Inputs: number of neighbors, training and test sets

Outputs: accuracy score, confusion matrix

In [11]:
def knn(X_train, y_train, X_test, y_test, class_list):
    """
    Performs K-nearest Neighbors with provided parameters, returns a tuple containing
    k, accuracy, precision, confusion matrix
    """
    # record metrics for the cross-validated value - n-neighbors
    # format of knn_metrics is [index, neighbors, score, confusion matrix]
    # score is the mean accuracy and is provided by the score function of the classifier
    knn_metrics = pd.DataFrame(columns=('neighbors', 'mean_accuracy', 'confusion matrix'))
    
    max_k = int(math.sqrt(len(X_train))/2)
    #max_k = 4
    print("max k = " + str(max_k))

    # TODO - look into parallelizing this
    i = 0
    for k in range(1, max_k + 1):
        start = timer()
        
        classifier = neighbors.KNeighborsClassifier(k, 'distance', n_jobs=-1)
        classifier.fit(X_train, y_train)

        y_prediction = classifier.predict(X_test)
        cm = metrics.confusion_matrix(y_test, y_prediction, labels=class_list)
        mean_accuracy = classifier.score(X_test, y_test)
        #obstacle_accuracy = cm
        
        end = timer()
        
        metric = (k, mean_accuracy, cm)
        if k == 1 or k%5 == 0:
            print("k = %s, metric = %s (%ss)" % (k, metric, end - start))
        knn_metrics.loc[i] = metric
        i += 1
    
    return knn_metrics

knn_metrics_rgb = {}
knn_metrics_gray = {}
knn_metrics_rgb_b = {}
knn_metrics_gray_b = {}

for t in test_set_sizes:
    print("\n####################\ntest set size = %s" % (t))
    # generate validation sets
    X_train_rgb, y_train_rgb, X_test_rgb, y_test_rgb = generate_validation_sets(X_rgb, y_rgb, t)
    X_train_gray, y_train_gray, X_test_gray, y_test_gray = generate_validation_sets(X_gray, y_gray, t)
    X_train_rgb_b, y_train_rgb_b, X_test_rgb_b, y_test_rgb_b = generate_validation_sets(X_rgb, y_rgb_binary, t)
    X_train_gray_b, y_train_gray_b, X_test_gray_b, y_test_gray_b = generate_validation_sets(X_gray, y_gray_binary, t)
    
    # generate metrics for knn for rgb and gray-scale
    print("Color, four classes:")
    knn_metrics_rgb[t] = knn(X_train_rgb, y_train_rgb, X_test_rgb, y_test_rgb, classes)
    print("Color, two classes:")
    knn_metrics_rgb_b[t] = knn(X_train_rgb_b, y_train_rgb_b, X_test_rgb_b, y_test_rgb_b, binary_classes)
    print("Grayscale, four classes:")
    knn_metrics_gray[t] = knn(X_train_gray, y_train_gray, X_test_gray, y_test_gray, classes)
    print("Grayscale, two classes:")
    knn_metrics_gray_b[t] = knn(X_train_gray_b, y_train_gray_b, X_test_gray_b, y_test_gray_b, binary_classes)


####################
test set size = 0.1
generating train/test split with test size = 0.1
generating train/test split with test size = 0.1
generating train/test split with test size = 0.1
generating train/test split with test size = 0.1
Color, four classes:
max k = 30
k = 1, metric = (1, 0.6908212560386473, array([[ 17,   8,   6,  12],
       [ 19, 202,   7,   8],
       [ 18,  10,  30,  10],
       [ 16,   5,   9,  37]])) (5.273974670277767s)
k = 5, metric = (5, 0.70772946859903385, array([[ 19,   9,  10,   5],
       [ 21, 201,  10,   4],
       [ 15,  12,  36,   5],
       [ 16,   9,   5,  37]])) (5.804723996826851s)
k = 10, metric = (10, 0.69565217391304346, array([[ 14,  15,   9,   5],
       [ 17, 202,  13,   4],
       [ 10,  16,  35,   7],
       [ 11,  12,   7,  37]])) (5.439763425851538s)
k = 15, metric = (15, 0.70048309178743962, array([[ 15,  15,  10,   3],
       [ 17, 206,   7,   6],
       [ 11,  17,  33,   7],
       [ 14,  12,   5,  36]])) (5.331760438639549s)
k = 20,

In [67]:
# generate plots for the KNN method
def knn_plots(knn_metrics, test_set_sizes, classes, color):
    """
    generates some plots for the KNN method given the metrics data structure and
    the test-set sizes
    """
    max_accuracy = -1
    max_accuracy_key = -1
    # plot for effect of test size on accuracy - color
    mean_accuracies = pd.DataFrame()
    for key in test_set_sizes:
        accuracies = knn_metrics[key]['mean_accuracy']
        max_acc = max(accuracies)
        
        if max_acc > max_accuracy:
            max_accuracy = max_acc
            max_accuracy_key = key
        
        mean_accuracies = pd.concat([mean_accuracies, accuracies.rename(key)], axis=1)
        #plt.boxplot(mean_accuracies)
    
    mean_accuracies.plot(kind='box')
    plt.title("Effect of test set size on KNN mean accuracy (%s, %s classes)" % (color, len(classes)))
    plt.xlabel("test set size")
    plt.ylabel("mean accuracy")
    plt.show()
    
    # show the confusion matrix for the best results
    best_index = knn_metrics[max_accuracy_key]['mean_accuracy'].idxmax()
    best_metric = knn_metrics[max_accuracy_key].loc[best_index]
    best_cm = best_metric['confusion matrix']
    best_k = best_metric['neighbors']
    plot_confusion_matrix(best_cm, classes=classes, 
                          title="KNN best mean accuracy confusion matrix, k=%d, test size=%s, %s" % (best_k, max_accuracy_key, color))
    plt.show()
    
    return max_accuracy_key

print("\n#########\nFour class, color plots")
best_rgb_test = knn_plots(knn_metrics_rgb, test_set_sizes, classes, 'color')
print("\n#########\nFour class, grayscale plots")
best_gray_test = knn_plots(knn_metrics_gray, test_set_sizes, classes, 'grayscale')
print("\n#########\nTwo class, color plots")
best_rgb_b_test = knn_plots(knn_metrics_rgb_b, test_set_sizes, binary_classes, 'color')
print("\n#########\nTwo class, grayscale plots")
best_gray_b_test = knn_plots(knn_metrics_rgb_b, test_set_sizes, binary_classes, 'grayscale')
    
#print("Color: best test size mean accuracy = %s @ test_size = %s", (highest_mean_accuracy_rgb, best_test_size_rgb))
#print("Grayscale: best test size mean accuracy = %s @ test_size = %s", (highest_mean_accuracy_gray, best_test_size_gray))
#print("COlor: best mean accuracy")

#max_accuracy_rgb = knn_metrics_rgb[best_test_size_rgb].loc[knn_metrics_rgb['mean_accuracy'].idxmax()]
#print("best mean accuracy for RGB = %s @ k = %s" % (max_accuracy_rgb['mean_accuracy'], max_accuracy_rgb['neighbors']))

#max_accuracy_gray = knn_metrics_gray[best_test_size_gray].loc[knn_metrics_gray['mean_accuracy'].idxmax()]
#print("best mean accuracy for grayscale = %s @ k = %s" % (max_accuracy_gray['score'], max_accuracy_gray['neighbors']))



#########
Four class, color plots
Normalized confusion matrix
[[ 0.44186047  0.30232558  0.13953488  0.11627907]
 [ 0.08898305  0.85169492  0.04661017  0.01271186]
 [ 0.17647059  0.19117647  0.54411765  0.08823529]
 [ 0.20895522  0.13432836  0.10447761  0.55223881]]

#########
Four class, grayscale plots
Normalized confusion matrix
[[ 0.23255814  0.3255814   0.30232558  0.13953488]
 [ 0.03813559  0.90677966  0.03813559  0.01694915]
 [ 0.22058824  0.29411765  0.38235294  0.10294118]
 [ 0.11940299  0.29850746  0.10447761  0.47761194]]

#########
Two class, color plots
Normalized confusion matrix
[[ 0.71428571  0.28571429]
 [ 0.15205149  0.84794851]]

#########
Two class, grayscale plots
Normalized confusion matrix
[[ 0.71428571  0.28571429]
 [ 0.15205149  0.84794851]]


In [95]:
print(knn_metrics_rgb[0.1])

    neighbors  mean_accuracy  \
0         1.0       0.690821   
1         2.0       0.690821   
2         3.0       0.700483   
3         4.0       0.695652   
4         5.0       0.707729   
5         6.0       0.710145   
6         7.0       0.700483   
7         8.0       0.698068   
8         9.0       0.693237   
9        10.0       0.695652   
10       11.0       0.693237   
11       12.0       0.683575   
12       13.0       0.681159   
13       14.0       0.683575   
14       15.0       0.700483   
15       16.0       0.695652   
16       17.0       0.690821   
17       18.0       0.690821   
18       19.0       0.695652   
19       20.0       0.685990   
20       21.0       0.681159   
21       22.0       0.681159   
22       23.0       0.683575   
23       24.0       0.681159   
24       25.0       0.683575   
25       26.0       0.683575   
26       27.0       0.693237   
27       28.0       0.693237   
28       29.0       0.700483   
29       30.0       0.690821   

       

In [68]:
rgb_plot, = plt.plot(knn_metrics_rgb[best_rgb_test]['neighbors'], knn_metrics_rgb[best_rgb_test]['mean_accuracy'], 
                     label='color accuracy', color='blue')
gray_plot, = plt.plot(knn_metrics_gray[best_gray_test]['neighbors'], knn_metrics_gray[best_gray_test]['mean_accuracy'], 
                      label='grayscale accuracy', color='red')
plt.title("Four-class mean accuracy vs k, test size = %s" % (best_rgb_test))
plt.xlabel("k")
plt.ylabel("mean accuracy")

# show a legend
plt.legend(handles=[rgb_plot, gray_plot], loc=5)

plt.show()

rgb_plot, = plt.plot(knn_metrics_rgb[best_rgb_b_test]['neighbors'], knn_metrics_rgb[best_rgb_b_test]['mean_accuracy'], 
                     label='color accuracy', color='blue')
gray_plot, = plt.plot(knn_metrics_gray[best_gray_b_test]['neighbors'], knn_metrics_gray[best_gray_b_test]['mean_accuracy'], 
                      label='grayscale accuracy', color='red')
plt.title("Two-class mean accuracy vs k, test size = %s" % (best_rgb_b_test))
plt.xlabel("k")
plt.ylabel("mean accuracy")

# show a legend
plt.legend(handles=[rgb_plot, gray_plot], loc=5)

plt.show()

# Logistic Regression
Variables: N/A

Output: accuracy score, confusion matrix

In [74]:
# binary classifier
# test different validation set sizes
two_scores = []
for t in test_set_sizes:
    X_train_rgb, y_train_rgb, X_test_rgb, y_test_rgb = generate_validation_sets(X_rgb, y_rgb_binary, t)
    classifier = linear_model.LogisticRegression(n_jobs=-1)
    classifier.fit(X_train_rgb, y_train_rgb)
    y_pred = classifier.predict(X_test_rgb)
    cm = metrics.confusion_matrix(y_test_rgb, y_pred, labels=binary_classes)
    score = classifier.score(X_test_rgb, y_test_rgb)
    two_scores.append([score, cm])
    print("test size = %f, score = %f" % (t,score))

generating train/test split with test size = 0.1
test size = 0.100000, score = 0.746377
generating train/test split with test size = 0.2
test size = 0.200000, score = 0.743961
generating train/test split with test size = 0.3
test size = 0.300000, score = 0.760870
generating train/test split with test size = 0.4
test size = 0.400000, score = 0.757850
generating train/test split with test size = 0.5
test size = 0.500000, score = 0.760870
generating train/test split with test size = 0.6
test size = 0.600000, score = 0.758052
generating train/test split with test size = 0.7
test size = 0.700000, score = 0.767771
generating train/test split with test size = 0.8
test size = 0.800000, score = 0.746981
generating train/test split with test size = 0.9
test size = 0.900000, score = 0.752013


In [75]:
# multi-class classifier
# test different validation set sizes
four_scores = []
for t in test_set_sizes:
    X_train_rgb, y_train_rgb, X_test_rgb, y_test_rgb = generate_validation_sets(X_rgb, y_rgb, t)
    classifier = linear_model.LogisticRegression(n_jobs=-1)
    classifier.fit(X_train_rgb, y_train_rgb)
    y_pred = classifier.predict(X_test_rgb)
    cm = metrics.confusion_matrix(y_test_rgb, y_pred, labels=classes)
    score = classifier.score(X_test_rgb, y_test_rgb)
    four_scores.append([score, cm])
    print("test size = %f, score = %f" % (t,score))

generating train/test split with test size = 0.1
test size = 0.100000, score = 0.676329
generating train/test split with test size = 0.2
test size = 0.200000, score = 0.669082
generating train/test split with test size = 0.3
test size = 0.300000, score = 0.673108
generating train/test split with test size = 0.4
test size = 0.400000, score = 0.681159
generating train/test split with test size = 0.5
test size = 0.500000, score = 0.668599
generating train/test split with test size = 0.6
test size = 0.600000, score = 0.680757
generating train/test split with test size = 0.7
test size = 0.700000, score = 0.676674
generating train/test split with test size = 0.8
test size = 0.800000, score = 0.661836
generating train/test split with test size = 0.9
test size = 0.900000, score = 0.651637


In [81]:
print(two_scores)
print(four_scores)

two_accs = []


two_plot, = plt.plot(test_set_sizes, [a for a,b in two_scores], label='two-class accuracy', color='red')
four_plot, = plt.plot(test_set_sizes, [a for a,b in four_scores], label='four-class accuracy', color='blue')
plt.title("Logistic regression accuracy vs. test size, color")
plt.xlabel("test set size")
plt.ylabel("mean accuracy")
plt.legend(handles=[two_plot, four_plot], loc=5)
axes = plt.gca()
axes.set_xlim([0,1])
plt.show()

[[0.74637681159420288, array([[ 48,  62],
       [ 43, 261]])], [0.7439613526570048, array([[ 97, 126],
       [ 86, 519]])], [0.76086956521739135, array([[147, 173],
       [124, 798]])], [0.75785024154589375, array([[ 186,  227],
       [ 174, 1069]])], [0.76086956521739135, array([[ 245,  287],
       [ 208, 1330]])], [0.75805152979066026, array([[ 273,  355],
       [ 246, 1610]])], [0.76777087646652864, array([[ 330,  391],
       [ 282, 1895]])], [0.7469806763285024, array([[ 380,  444],
       [ 394, 2094]])], [0.75201288244766507, array([[ 412,  505],
       [ 419, 2390]])]]
[[0.67632850241545894, array([[  7,  18,  14,   4],
       [ 20, 202,   4,  10],
       [  4,  16,  40,   8],
       [  7,  21,   8,  31]])], [0.66908212560386471, array([[ 22,  46,  36,   6],
       [ 26, 383,  20,  16],
       [ 14,  37,  96,  13],
       [ 14,  28,  18,  53]])], [0.67310789049919484, array([[ 35,  55,  47,  18],
       [ 39, 595,  34,  23],
       [ 29,  44, 133,  25],
       [ 19,  49, 

# Softmax Regression (precursor to CNN)

This section is largely adapted from:

https://www.tensorflow.org/get_started/mnist/pros


In [58]:
sess = tf.InteractiveSession()

# setup inputs
# x has 3072 features since it consists of 32x32 pixels
# y_ is a one-hot multi-dimensional vector the size of the number
# of classes
num_features = len(X_train_rgb[0])

x = tf.placeholder(tf.float32, [None, num_features])
y_ = tf.placeholder(tf.int32, [None, len(classes)])

# define weights (W) and biases (b)
W = tf.Variable(tf.zeros([num_features, len(classes)]))
b = tf.Variable(tf.zeros([len(classes)]))

# initialize variables
tf.global_variables_initializer().run()

In [59]:
# implement the regression model
model = tf.matmul(x, W) + b

# specify the loss function
cross_entropy = tf.reduce_mean(
    tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=model))

# use steepest gradient descent to train the model
train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)

In [60]:
# now repeatedly run train_step to perform gradient descent
offset = 0
batch_size = 100
for _ in range(1000):
    X_batch, y_batch, offset = next_batch(X_train_rgb, y_train_rgb, offset, batch_size)
    # generate one-hot encoding for the response
    y_one_hot, class_map = one_hot(y_batch, classes)
    sess.run(train_step, feed_dict={x: X_batch, y_: y_one_hot})

In [62]:
# evaluate the model
y_true = tf.argmax(y_, 1)
y_pred = tf.argmax(model, 1)

predictions = y_pred.eval(feed_dict={ x: X_test_rgb })

# define metric
correct_prediction = tf.equal(y_pred, y_true)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

y_one_hot, _ = one_hot(y_test_rgb, classes)
#print(accuracy.eval(feed_dict={x: X_test, y_: y_one_hot}))
accuracy_metric = sess.run(accuracy, feed_dict={x: X_test_rgb, y_: y_one_hot})
print("accuracy = %s" % (accuracy_metric))

sess.close()

# calculate confusion matrix
# return the predictions to class names
class_predictions = []
for pred in predictions:
    class_predictions.append(class_map[pred])
cm = metrics.confusion_matrix(y_test_rgb, np.array(class_predictions))
plot_confusion_matrix(cm, classes, title="Softmax Regression Confusion Matrix")
plt.show()

accuracy = 0.700119
Normalized confusion matrix
[[  4.78087649e-02   9.48207171e-01   3.98406375e-03]
 [  4.29922614e-03   9.94840929e-01   8.59845228e-04]
 [  7.69230769e-02   9.11538462e-01   1.15384615e-02]]


# Convolutional Neural Network

This code adapted from https://www.tensorflow.org/get_started/mnist/pros

In [56]:
# generate a confusion matrix
def tf_confusion_matrix(predictions, labels, classes):
    """
    produces and returns a confusion matrix given the predictions generated by
    tensorflow (in one-hot format), and string labels.
    """

    y_pred_strings = []
    labels = labels.tolist()
    
    for p in predictions:
        max_value = max(p)
        max_index = p.tolist().index(max_value)
        y_pred_strings.append(classes[max_index])
    
    #print(type(y_pred_strings))
    #print(type(labels))
    
    cm = metrics.confusion_matrix(labels, y_pred_strings)
    
    return cm

# tensorflow cnn function
def tensorflow_cnn(X, y, classes, test_set_sizes, iterations=5000):
    """
    creates a tensor flow cnn and executes the computational graph,
    returns a dict containing test set size: accuracy pairs
    """
    # CNN Setup
    sess = tf.InteractiveSession()

    num_features = len(X[0])
    num_classes = len(classes)

    # placeholders
    x = tf.placeholder(tf.float32, [None, num_features])
    y_ = tf.placeholder(tf.int32, [None, num_classes])

    # initialize weights with small amount of noise
    def weight_variable(shape):
        initial = tf.truncated_normal(shape, stddev=0.1)
        return tf.Variable(initial)

    # give the neurons a slightly positive bias to avoid dead neurons
    def bias_variable(shape):
        initial = tf.constant(0.1, shape=shape)
        return tf.Variable(initial)

    # conv2d uses stride of one and are zero-padded - output
    # is the same size as the input
    def conv2d(x, W):
        return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    # pooling is max pooling over 2x2 blocks
    def max_pool_2x2(x):
        return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                             strides=[1, 2, 2, 1], padding='SAME')
    
    # first convolutional layer
    # convolution , followed by max pooling
    W_conv1 = weight_variable([5, 5, 1, 32])
    b_conv1 = bias_variable([32])

    # reshape x to a 4d tensor
    x_image = tf.reshape(x, [-1, 28, 28, 1])

    # reshape x_image with weight tensor, add the bias, apply ReLU function
    # finally max pool
    # max_pool_2x2 reduces image to 14x14
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)
    
    # second convolutional layer
    # 64 features for each 5x5 patch
    W_conv2 = weight_variable([5, 5, 32, 64])
    b_conv2 = bias_variable([64])

    # max_pool_2x2 reduces image size to 7x7
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
    h_pool2 = max_pool_2x2(h_conv2)
    
    # densely connected layer
    # fully-connected layer with 1024 neurons
    # reshape the tensor from the pooling layer into a batch of vectors
    # multiply by weight matrix, add a bias, and apply ReLU
    W_fc1 = weight_variable([7 * 7 * 64, 1024])
    b_fc1 = bias_variable([1024])

    h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
    
    # dropout - reduces overfitting
    # turned on during training, turned off during testing, controlled by the keep_prob placeholder
    keep_prob = tf.placeholder(tf.float32)
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
    
    # readout layer
    W_fc2 = weight_variable([1024, num_classes])
    b_fc2 = bias_variable([num_classes])

    y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

    cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y_conv))
    train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
    true_positives = tf.metrics.true_positives(y_, y_conv)
    correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    
    # perform classification for all of the test sizes
    tests = {}
    best_accuracy = -1
    best_test_size = -1
    batch_size = 50

    for t in test_set_sizes:
        offset = 0
        sess.run(tf.global_variables_initializer())
        print("test_set_size = %g" % t)
        X_train, y_train, X_test, y_test = generate_validation_sets(X, y, t)

        for i in range(iterations):
            X_batch, y_batch, offset = next_batch(X_train, y_train, offset, batch_size)
            y_batch_one_hot, _ = one_hot(y_batch, classes)
            if i%200 == 0:
                train_accuracy = accuracy.eval(feed_dict={x: X_batch, y_: y_batch_one_hot, keep_prob: 1.0 })
                print("step %d, training accuracy %g" % (i, train_accuracy))
            train_step.run(feed_dict={ x: X_batch, y_: y_batch_one_hot, keep_prob: 0.5})

        y_test_one_hot, _ = one_hot(y_test, classes)

        # score is mean accuracy of the classifier
        score = accuracy.eval(feed_dict={ x: X_test, y_: y_test_one_hot, keep_prob: 1.0})

        # create the confusion matrix
        feed_dict = {x: X_test, keep_prob: 1.0}
        classification = y_conv.eval(feed_dict)
        cm = tf_confusion_matrix(classification, y_test, classes)
        
        tests[t] = (score, cm)

        print("test accuracy %g\n" % (tests[t][0]))

    return tests

In [58]:
# classification for four class and two class schemes
results_four = tensorflow_cnn(X_gray, y_gray, classes, test_set_sizes, iterations=20000)
results_two = tensorflow_cnn(X_gray, y_gray_binary, binary_classes, test_set_sizes, iterations=20000)

test_set_size = 0.1
generating train/test split with test size = 0.1
step 0, training accuracy 0.18
step 200, training accuracy 0.62
step 400, training accuracy 0.6
step 600, training accuracy 0.7
step 800, training accuracy 0.74
step 1000, training accuracy 0.84
step 1200, training accuracy 0.8
step 1400, training accuracy 0.78
step 1600, training accuracy 0.8
step 1800, training accuracy 0.82
step 2000, training accuracy 0.86
step 2200, training accuracy 0.88
step 2400, training accuracy 0.86
step 2600, training accuracy 0.94
step 2800, training accuracy 0.9
step 3000, training accuracy 0.96
step 3200, training accuracy 0.92
step 3400, training accuracy 0.86
step 3600, training accuracy 1
step 3800, training accuracy 0.94
step 4000, training accuracy 0.88
step 4200, training accuracy 0.94
step 4400, training accuracy 0.98
step 4600, training accuracy 1
step 4800, training accuracy 1
step 5000, training accuracy 0.96
step 5200, training accuracy 1
step 5400, training accuracy 0.96
ste

In [96]:
print(results_four)
print(results_two)

results_four_acc = []
results_two_acc = []
for t in test_set_sizes:
    results_four_acc.append(results_four[t][0])
    results_two_acc.append(results_two[t][0])


four_plot, = plt.plot(test_set_sizes, results_four_acc, label='4-class accuracy', color='blue')
two_plot, = plt.plot(test_set_sizes, results_two_acc, label='2-class accuracy', color='red')
plt.title("CNN accuracy vs. test size")
plt.xlabel("test set size")
plt.ylabel("mean accuracy")
plt.legend(handles=[two_plot, four_plot], loc=5)
axes = plt.gca()
axes.set_xlim([0,1])
plt.show()

# plot the confusion matrix for the best results
plot_confusion_matrix(results_four[0.1][1], classes, title='Four class CNN confusion matrix, test size=0.1')
plt.show()
plot_confusion_matrix(results_two[0.1][1], binary_classes, title='Two class CNN confusion matrix, test size=0.1')
plt.show()

{0.1: (0.76328504, array([[ 14,   7,  16,   6],
       [  3, 220,   9,   4],
       [  9,  13,  42,   4],
       [  5,   8,  14,  40]])), 0.2: (0.74758452, array([[ 38,  16,  48,   8],
       [  7, 413,  16,   9],
       [ 17,  32,  99,  12],
       [ 14,  13,  17,  69]])), 0.3: (0.7616747, array([[ 60,  25,  46,  24],
       [ 12, 638,  28,  13],
       [ 29,  46, 142,  14],
       [ 17,  25,  17, 106]])), 0.6: (0.73067635, array([[ 117,   64,   87,   42],
       [  36, 1285,   60,   18],
       [  95,   94,  238,   30],
       [  44,   52,   47,  175]])), 0.8: (0.69776571, array([[ 132,   84,  162,   34],
       [  48, 1617,  173,   15],
       [ 121,  124,  357,   33],
       [  55,   75,   77,  205]])), 0.5: (0.74106282, array([[ 107,   53,   81,   27],
       [  31, 1071,   48,   14],
       [  59,   78,  213,   24],
       [  31,   49,   41,  143]])), 0.7: (0.72981364, array([[ 138,   63,  120,   45],
       [  41, 1474,   96,   14],
       [ 103,  112,  305,   32],
       [  46,