In [None]:
import tensorflow as tf
import cv2
from matplotlib import pyplot as plt
import glob
import os
import random
from random import randint
import math
import numpy as np
import keras
from keras import backend as K
from tensorflow.python.framework import graph_util
from tensorflow.python.framework import graph_io
from keras.models import load_model
from keras.models import Model, Sequential
from keras.layers import Dense, Flatten, Dropout, Input, GlobalAveragePooling2D
from keras.layers import Conv2D, MaxPooling2D, Activation, Lambda, concatenate, ELU
from keras.callbacks import ModelCheckpoint, EarlyStopping, Callback
from keras import optimizers
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator

# This is needed to display the images.
%matplotlib inline

In [None]:
image_types = ["red", "green", "yellow"]
input_img_x = 32
input_img_y = 32

full_set = []
for im_type in image_types:
    for ex in glob.glob(os.path.join("../dataset/", im_type, "*")):
        im = cv2.imread(ex)
        if not im is None:
            # Resize image
            im = cv2.resize(im, (input_img_x, input_img_y))

            # Convert to RGB
            im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) 
            
            # Create an array representing our classes and set it
            one_hot_array = [0] * len(image_types)
            one_hot_array[image_types.index(im_type)] = 1
            assert(im.shape == (input_img_x, input_img_y, 3))

            # Add it to our set
            full_set.append((im, one_hot_array, ex))

# Shuffle the data
random.shuffle(full_set)
print(len(full_set))

In [None]:
# Define split and batch size
train_test_split_ratio = 0.9
batch_size = 32

# We split our data into a training and test set here
split_index = int(math.floor(len(full_set) * train_test_split_ratio))
print(split_index)
train_set = full_set[:split_index]
print(len(train_set))
test_set = full_set[split_index:]

# We ensure that our training and test sets are a multiple of batch size
train_set_offset = len(train_set) % batch_size
test_set_offset = len(test_set) % batch_size
train_set = train_set[: len(train_set) - train_set_offset]
test_set = test_set[: len(test_set) - test_set_offset]

# Split
train_x, train_y, train_z = zip(*train_set)
test_x, test_y, test_z = zip(*test_set)

# Convert to numpy array
train_x = np.array(train_x)
train_y = np.array(train_y)
test_x = np.array(test_x)
test_y = np.array(test_y)
print("Training samples: {}".format(len(train_y)))
print("Test samples: {}".format(len(test_y)))

In [None]:
def get_class_weights(y):
    unique, counts = np.unique(y, return_counts=True)
    majority = max(counts)
    return  {cls: float(majority/count) for cls, count in zip(unique, counts)}

train_y_dec = np.argmax(train_y, axis=1)
class_weights = get_class_weights(train_y_dec)
print(class_weights)

In [None]:
def fire_module(x, fire_id, squeeze=16, expand=64): 
    # Squeeze part with 1x1 conv
    x = Conv2D(squeeze, (1, 1), padding='valid', name='fire' + str(fire_id) + '/' + "squeeze1x1")(x)
    x = Activation('elu', name='fire' + str(fire_id) + '/elu_squeeze1x1')(x)
    # Expand part with 1x1 conv
    left = Conv2D(expand, (1, 1), padding='valid', name='fire' + str(fire_id) + '/' + "expand1x1")(x)
    left = Activation('elu', name='fire' + str(fire_id) + '/elu_expand1x1')(left)
    # Expand part with 3x3 conv
    right = Conv2D(expand, (3, 3), padding='same', name='fire' + str(fire_id) + '/' + "expand3x3")(x)
    right = Activation('elu', name='fire' + str(fire_id) + '/elu_expand3x3')(right)
    # Fuse left and right expansion together
    x = concatenate([left, right], axis=3, name='fire' + str(fire_id) + '/' + 'concat')
    return x

def squeeze_model():
    # Shape of our input image is 32x32x3
    input_img = Input(shape=(32, 32, 3))
    
    # This is our normalization from 0 to 1
    x = Lambda(lambda x: x/255.0, input_shape=(32, 32, 3))(input_img)
    
    # Use two fire modules with 16 filter squeeze and 64 filter expand
    x = fire_module(x, fire_id=1, squeeze=16, expand=64)
    x = fire_module(x, fire_id=2, squeeze=16, expand=64)
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(x)
    
    # Use two fire modules with 32 filter squeeze and 128 filter expand
    x = fire_module(x, fire_id=3, squeeze=32, expand=128)
    x = fire_module(x, fire_id=4, squeeze=32, expand=128)
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(x)
    
    # Use two fire modules with 48 filter squeeze and 192 filter expand
    x = fire_module(x, fire_id=5, squeeze=48, expand=192)
    x = fire_module(x, fire_id=6, squeeze=48, expand=192)
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(x)
    
    # Dropout with drop rate 20%
    x = Dropout(0.2)(x)  
    
    # Last conv layer
    x = Conv2D(100, (1, 1), padding='valid')(x)
    x = Activation('elu')(x)
    
    # Do a global average pooling
    x = GlobalAveragePooling2D()(x)
    
    # This is our output layer for the three classes with softmax
    out = Dense(3, name='loss', activation="softmax")(x)
    
    # Build our model
    model = Model(inputs=input_img, outputs=[out])     
    return model

model = squeeze_model()
model.summary()

In [None]:
# Maximum number of epochs for training
epochs = 2

# Let's train the model using Adam optimizer
model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=1e-4), metrics=['accuracy'])

# Callbacks for checkpoints, visualisation and early stop
check_point = ModelCheckpoint('../checkpoints/model-e{epoch:03d}.h5', monitor='val_loss', verbose=0, save_best_only=False, save_weights_only=False, mode='auto')
early_stop = EarlyStopping(monitor='val_loss', patience=4, verbose=0, mode='min')
        
history = model.fit(x=train_x,y=train_y,verbose=2,batch_size=batch_size,epochs=epochs,
                    validation_data = (test_x, test_y), callbacks=[early_stop, check_point])

# Save the model to file
model.save('../models/model.h5')

# Print message
print("Model trained!")

In [None]:
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(len(acc))
plt.plot(epochs, acc, 'bo')
plt.plot(epochs, val_acc, 'b')
plt.title('Training and validation accuracy')
plt.figure()
plt.plot(epochs, loss, 'bo')
plt.plot(epochs, val_loss, 'b')
plt.title('Training and validation loss')
plt.show()

In [None]:
K.set_learning_phase(0)
model = load_model("../models/model.h5")
print("Model loaded!")

In [None]:
def predict(image):
    sfmax = list(model.predict(image, verbose=0)[0])
    sf_ind = sfmax.index(max(sfmax))
    predicted_label = image_types[sf_ind]
    return predicted_label

image = test_x[randint(0, len(test_x))]
plt.imshow(image)
img = np.reshape(image, (1, 32, 32, 3))
print(predict(img))

In [None]:
pred = [None]
pred_node_names = [None]
pred_node_names[0] = 'output_'+str(0)
pred[0] = tf.identity(model.output[0], name=pred_node_names[0])
sess = K.get_session()
constant_graph = graph_util.convert_variables_to_constants(sess, sess.graph.as_graph_def(), pred_node_names)
graph_io.write_graph(constant_graph, 'model', 'model_classification.pb', as_text=False)
print('Keras model converted to TensorFlow model!')

In [None]:
# Function to load a graph from a protobuf file
def load_graph(graph_file):
    with tf.Session(graph=tf.Graph(), config=config) as sess:
        gd = tf.GraphDef()
        with tf.gfile.Open(graph_file, 'rb') as f:
            data = f.read()
            gd.ParseFromString(data)
        tf.import_graph_def(gd, name='')
        return sess.graph

# Enable optimizations
config = tf.ConfigProto()
jit_level = tf.OptimizerOptions.ON_1
config.graph_options.optimizer_options.global_jit_level = jit_level    

# Load graph from protobuf file
graph = load_graph(os.path.join('model', 'model_classification.pb'))

# Get input and output tensors
in_graph = graph.get_tensor_by_name('input_1_1:0')
out_graph = graph.get_tensor_by_name('output_0:0')

In [None]:
# Pick a random image from the test set
image = test_x[randint(0, len(test_x))]
plt.imshow(image)

# Do the prediction here
with tf.Session(graph=graph, config=config) as sess:   
    sfmax = list(sess.run(tf.nn.softmax(out_graph.eval(feed_dict={in_graph: [image]}))))
    sf_ind = sfmax.index(max(sfmax))
    predicted_label = image_types[sf_ind]
    print(predicted_label)