In [1]:
from PIL import Image
import numpy as np
import tensorflow as tf
import glob
import keras
from sklearn.model_selection import train_test_split

Using TensorFlow backend.


# Parameters

In [2]:
TRAIN_SIZE = 0.7
NUM_CLASSES = 3
RESIZE_IMAGE_SHAPE = (32,32)
IMAGE_SIZE = 32
NUM_CHANNELS = 3
BATCH_SIZE = 16
PATCH_SIZE = 5
DEPTH = 16
NUM_HIDDEN1 = 256
NUM_HIDDEN2 = 64

# View Images

In [3]:
def view_images(image_list):
    for image in image_list:
        img = Image.fromarray(image, 'RGB')
        img.show()

# Read & Split Data

In [4]:
def read_split_data():
    image_list = []
    myclass = []
    for filename in glob.glob('Dataset/Compost/*.jpg'):
        img = Image.open(filename)
        img = img.resize(RESIZE_IMAGE_SHAPE, Image.ANTIALIAS)
        img = np.array(img)
        image_list.append(img)
        myclass.append(0)
    for filename in glob.glob('Dataset/Landfill/*.jpg'):
        img = Image.open(filename)
        img = img.resize(RESIZE_IMAGE_SHAPE, Image.ANTIALIAS)
        img = np.array(img)
        image_list.append(img)
        myclass.append(1)
    for filename in glob.glob('Dataset/Recyclable/*.jpg'):
        img = Image.open(filename)
        img = img.resize(RESIZE_IMAGE_SHAPE, Image.ANTIALIAS)
        img = np.array(img)
        image_list.append(img)
        myclass.append(2)
    
    X = np.asarray(image_list, dtype='float32')
    y = keras.utils.to_categorical(np.asarray(myclass), num_classes=NUM_CLASSES)
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=TRAIN_SIZE)
    
    return len(X_train), len(X_test), X_train, X_test, y_train, y_test

# CNN Model

In [5]:
def cnn_model(data, cnn1_W, cnn1_b, cnn2_W, cnn2_b, fc1_W, fc1_b, fc2_W, fc2_b, classifier_W, classifier_b):
    # First convolution layer with stride = 1 and pad the edge to make the output size the same.
    # Apply ReLU and a maximum 2x2 pool
    conv1 = tf.nn.conv2d(data, cnn1_W, [1, 1, 1, 1], padding='SAME')
    hidden1 = tf.nn.relu(conv1 + cnn1_b)
    pool1 = tf.nn.max_pool(hidden1, [1, 2, 2, 1], [1, 2, 2, 1], padding='SAME')

    # Second convolution layer
    conv2 = tf.nn.conv2d(pool1, cnn2_W, [1, 1, 1, 1], padding='SAME')
    hidden2 = tf.nn.relu(conv2 + cnn2_b)
    pool2 = tf.nn.max_pool(hidden2, [1, 2, 2, 1], [1, 2, 2, 1], padding='SAME')

    # Flattern the convolution output
    shape = pool2.get_shape().as_list()
    reshape = tf.reshape(pool2, [shape[0], shape[1] * shape[2] * shape[3]])

    # 2 FC hidden layers
    fc1 = tf.nn.relu(tf.matmul(reshape, fc1_W) + fc1_b)
    fc2 = tf.nn.relu(tf.matmul(fc1, fc2_W) + fc2_b)

    # Return the result of the classifier
    return tf.matmul(fc2, classifier_W) + classifier_b

# Train CNN

In [6]:
def train_cnn(train_size, test_size, X_train, X_test, y_train, y_test):
    
    train_dataset = X_train.reshape((train_size, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS)).astype(np.float32)
    test_dataset = X_test.reshape((test_size, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS)).astype(np.float32)
    graph = tf.Graph()

    with graph.as_default():
        
        # Define the training dataset and lables
        tf_train_dataset = tf.placeholder(tf.float32, shape=(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))
        tf_train_labels = tf.placeholder(tf.float32, shape=(BATCH_SIZE, NUM_CLASSES))

        # Validation/test dataset
        tf_test_dataset = tf.constant(X_test)
    
        # CNN layer 1 with filter (num_channels, depth) (3, 16)
        cnn1_W = tf.Variable(tf.truncated_normal([PATCH_SIZE, PATCH_SIZE, NUM_CHANNELS, DEPTH], stddev=0.1))
        cnn1_b = tf.Variable(tf.zeros([DEPTH]))

        # CNN layer 2 with filter (depth, depth) (16, 16)
        cnn2_W = tf.Variable(tf.truncated_normal([PATCH_SIZE, PATCH_SIZE, DEPTH, DEPTH], stddev=0.1))
        cnn2_b = tf.Variable(tf.constant(1.0, shape=[DEPTH]))

        # Compute the output size of the CNN2 as a 1D array.
        # CHECK!!!!
        size = IMAGE_SIZE // 4 * IMAGE_SIZE // 4 * DEPTH

        # FC1 (size, num_hidden1) (size, 256)
        fc1_W = tf.Variable(tf.truncated_normal([size, NUM_HIDDEN1], stddev=np.sqrt(2.0 / size)))
        fc1_b = tf.Variable(tf.constant(1.0, shape=[NUM_HIDDEN1]))

        # FC2 (num_hidden1, num_hidden2) (size, 64)
        fc2_W = tf.Variable(tf.truncated_normal([NUM_HIDDEN1, NUM_HIDDEN2], stddev=np.sqrt(2.0/(NUM_HIDDEN1))))
        fc2_b = tf.Variable(tf.constant(1.0, shape=[NUM_HIDDEN2]))

        # Classifier (num_hidden2, num_labels) (64, 10)
        classifier_W = tf.Variable(tf.truncated_normal([NUM_HIDDEN2, NUM_CLASSES], stddev=np.sqrt(2.0 / (NUM_HIDDEN2))))
        classifier_b = tf.Variable(tf.constant(1.0, shape=[NUM_CLASSES]))
        
        
        
        # Training computation.
        logits = cnn_model(tf_train_dataset, cnn1_W, cnn1_b, cnn2_W, cnn2_b, 
                           fc1_W, fc1_b, fc2_W, fc2_b, classifier_W, classifier_b)
        loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(labels=tf_train_labels, logits=logits))

        # Optimizer.
        optimizer = tf.train.AdamOptimizer(0.0005).minimize(loss)

        # Predictions for the training, validation, and test data.
        train_prediction = tf.nn.softmax(logits)
        test_prediction = tf.nn.softmax(cnn_model(tf_test_dataset, cnn1_W, cnn1_b, cnn2_W, cnn2_b, 
                                                  fc1_W, fc1_b, fc2_W, fc2_b, classifier_W, classifier_b))

        
        
    num_steps = 20001
    
    with tf.Session(graph=graph) as session:
        tf.global_variables_initializer().run()
        print('Initialized')
        for step in range(num_steps):
            offset = (step * BATCH_SIZE) % (y_train.shape[0] - BATCH_SIZE)
            batch_data = train_dataset[offset:(offset + BATCH_SIZE), :, :, :]
            batch_labels = y_train[offset:(offset + BATCH_SIZE), :]
            feed_dict = {tf_train_dataset : batch_data, tf_train_labels : batch_labels}
            _, l, predictions = session.run(
              [optimizer, loss, train_prediction], feed_dict=feed_dict)
            if (step % 500 == 0):
                print('Minibatch loss at step %d: %f' % (step, l))
                print('Minibatch accuracy: %.1f%%' % accuracy(predictions, batch_labels))
                print('Test accuracy: %.1f%%' % accuracy(test_prediction.eval(), y_test))

# Accuracy

In [7]:
# CHECK: Figure out what this is doing!!
def accuracy(predictions, labels):
    return (100.0 * np.sum(np.argmax(predictions, 1) == np.argmax(labels, 1))
            / predictions.shape[0])

# Main

In [8]:
train_size, test_size, X_train, X_test, y_train, y_test = read_split_data()

# view_images(X_test)

train_cnn(train_size, test_size, X_train, X_test, y_train, y_test)



Initialized
Minibatch loss at step 0: 153.656219
Minibatch accuracy: 31.2%
Test accuracy: 22.2%
Minibatch loss at step 500: 0.000011
Minibatch accuracy: 100.0%
Test accuracy: 11.1%
Minibatch loss at step 1000: 0.000003
Minibatch accuracy: 100.0%
Test accuracy: 22.2%
Minibatch loss at step 1500: 0.000001
Minibatch accuracy: 100.0%
Test accuracy: 33.3%
Minibatch loss at step 2000: 0.000001
Minibatch accuracy: 100.0%
Test accuracy: 22.2%
Minibatch loss at step 2500: 0.000000
Minibatch accuracy: 100.0%
Test accuracy: 22.2%
Minibatch loss at step 3000: 0.000000
Minibatch accuracy: 100.0%
Test accuracy: 22.2%
Minibatch loss at step 3500: 0.000000
Minibatch accuracy: 100.0%
Test accuracy: 22.2%
Minibatch loss at step 4000: 0.000000
Minibatch accuracy: 100.0%
Test accuracy: 33.3%
Minibatch loss at step 4500: 0.000000
Minibatch accuracy: 100.0%
Test accuracy: 33.3%
Minibatch loss at step 5000: 0.000000
Minibatch accuracy: 100.0%
Test accuracy: 55.6%
Minibatch loss at step 5500: 0.000000
Minibat