https://github.com/vrakesh/CIFAR-10-Classifier

In [None]:
import pickle

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.tools.graph_transforms import TransformGraph

from utensor_cgen.utils import prepare_meta_graph

In [None]:
print(tf.__version__)

# Define Graph

In [None]:
with open('cnn_weights.pkl', 'rb') as fid:
    pretrain_weights = pickle.load(fid)

In [None]:
from functools import reduce

In [None]:
def conv_layer(in_fmap, kernel, bias, act_fun=None, name=None):
    with tf.name_scope(name, 'conv'):
        tf_kernel = tf.Variable(kernel,
                                dtype=tf.float32,
                                name='kernel')
        tf_bias = tf.Variable(bias,
                              dtype=tf.float32,
                              name='bias')
        out_fmap = tf.add(tf.nn.conv2d(in_fmap, kernel,
                                       strides=[1, 1, 1, 1],
                                       padding='SAME'),
                          tf_bias,
                          name='fmap')
        if act_fun:
            out_fmap = act_fun(out_fmap, name='activation')
        return out_fmap

In [None]:
def fc_layer(in_fmap, weight, bias, act_fun=None, name=None):
    with tf.name_scope(name, 'fc'):
        tf_weight = tf.Variable(weight,
                                dtype=tf.float32,
                                name='weight')
        tf_bias = tf.Variable(bias,
                              dtype=tf.float32,
                              name='bias')
        logits = tf.add(tf.matmul(in_fmap, tf_weight),
                        tf_bias,
                        name='logits')
        if act_fun:
            logits = act_fun(logits, name='activation')
    return logits

In [None]:
def cross_entropy_loss(logits, labels, name=None, axis=-1):
    '''https://github.com/keras-team/keras/blob/master/keras/backend/tensorflow_backend.py#L3171
    '''
    with tf.name_scope(name, 'cross_entropy'):
        prob = tf.nn.softmax(logits=logits, axis=axis)
        prob = tf.clip_by_value(prob, 1e-7, 1-1e-7)
        loss = tf.reduce_sum(-labels * tf.log(prob), name='total_loss')
    return loss

In [None]:
graph = tf.Graph()

with graph.as_default():
    tf_image_batch = tf.placeholder(tf.float32, shape=[None, 32, 32, 3])
    tf_labels = tf.placeholder(tf.float32, shape=[None, 10])
    
    conv1 = conv_layer(tf_image_batch,
                       pretrain_weights['conv2d']['conv2d/kernel:0'],
                       pretrain_weights['conv2d']['conv2d/bias:0'],
                       act_fun=tf.nn.relu,
                       name='conv1')
    conv2 = conv_layer(conv1,
                       pretrain_weights['conv2d_1']['conv2d_1/kernel:0'],
                       pretrain_weights['conv2d_1']['conv2d_1/bias:0'],
                       act_fun=tf.nn.relu,
                       name='conv2')
    pool1 = tf.nn.max_pool(conv2,
                           ksize=[1, 2, 2, 1],
                           strides=[1, 2, 2, 1],
                           padding='VALID')
    conv3 = conv_layer(pool1,
                       pretrain_weights['conv2d_2']['conv2d_2/kernel:0'],
                       pretrain_weights['conv2d_2']['conv2d_2/bias:0'],
                       act_fun=tf.nn.relu)
    pool2 = tf.nn.max_pool(conv3,
                           ksize=[1, 2, 2, 1],
                           strides=[1, 2, 2, 1],
                           padding='VALID')
    conv4 = conv_layer(pool2,
                       pretrain_weights['conv2d_3']['conv2d_3/kernel:0'],
                       pretrain_weights['conv2d_3']['conv2d_3/bias:0'],
                       act_fun=tf.nn.relu)
    pool3 = tf.nn.max_pool(conv4,
                           ksize=[1, 2, 2, 1],
                           strides=[1, 2, 2, 1],
                           padding='VALID')
    flat_pool3 = tf.reshape(pool3,
                            [-1, reduce(lambda x, y: x*y,
                                        pool3.shape.as_list()[1:],
                                        1)])
    tf_keep_prob = tf.placeholder(tf.float32, name='keep_prob')
    dropout1 = tf.nn.dropout(flat_pool3,
                             keep_prob=tf_keep_prob,
                             name='dropout1')
    fc1 = fc_layer(dropout1,
                   pretrain_weights['dense']['dense/kernel:0'],
                   pretrain_weights['dense']['dense/bias:0'],
                   act_fun=tf.nn.relu,
                   name='fc1')
    dropout2 = tf.nn.dropout(fc1,
                             keep_prob=tf_keep_prob,
                             name='dropout2')
    logits = fc_layer(dropout2,
                      pretrain_weights['dense_1']['dense_1/kernel:0'],
                      pretrain_weights['dense_1']['dense_1/bias:0'],
                      name='logits')
    pred_labels = tf.argmax(logits,
                            axis=-1,
                            name='pred')
    total_loss = cross_entropy_loss(logits=logits, labels=tf_labels)
    
    train_op = tf.train.AdadeltaOptimizer(learning_rate=1.0, epsilon=1e-7).minimize(total_loss)
    saver = tf.train.Saver(max_to_keep=None)

# Train

In [None]:
from cifar import read_data_sets

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
import sys

In [None]:
batch_size = 32
num_iter_per_epoch = 1500

In [None]:
!rm -rf ckpt && mkdir -p ckpt/cnn

# this will takes long to complete if running on CPU
cifar = read_data_sets('./data', one_hot=True, reshape=False)
img_gen = ImageDataGenerator(width_shift_range=0.1,
                             height_shift_range=0.1,
                             horizontal_flip=True)
img_gen.fit(cifar.train.images)
batch_gen = img_gen.flow(cifar.train.images,
                         cifar.train.labels,
                         batch_size=batch_size)

with tf.Session(graph=graph) as sess:
    tf.global_variables_initializer().run()
    # compute original loss
    l, p_labels = sess.run([total_loss, pred_labels],
                           feed_dict={tf_image_batch: cifar.test.images,
                                      tf_labels: cifar.test.labels,
                                      tf_keep_prob: 1.0})
    l /= cifar.test.images.shape[0]
    acc = (p_labels == np.argmax(cifar.test.labels, axis=-1)).mean()
    print(f'original loss: {l}')
    print(f'acc on test set: {acc*100:.2f}%')
    
    for epoch in range(5):
        print(f'epoch {epoch} start')
        for _ in range(num_iter_per_epoch):
            images_batch, labels_batch = next(batch_gen)
            _ = sess.run(train_op,
                         feed_dict={tf_image_batch: images_batch,
                                    tf_labels: labels_batch,
                                    tf_keep_prob: 0.8})
        test_loss, p_labels = sess.run([total_loss, pred_labels],
                                       feed_dict={tf_image_batch: cifar.test.images,
                                                   tf_labels: cifar.test.labels,
                                                   tf_keep_prob: 1.0})
        test_loss /= cifar.test.images.shape[0]
        acc = (p_labels == np.argmax(cifar.test.labels, axis=-1)).mean()
        print(f'test loss: {test_loss}, {acc*100:0.2f}%')
        ckpt = saver.save(sess, 'ckpt/cnn/model', global_step=epoch)
        print(f'epoch saved {ckpt}')

In [None]:
ckpt

In [None]:
!tree ckpt

In [None]:
graph_def = prepare_meta_graph(ckpt+'.meta', output_nodes=[pred_labels.op.name])

In [None]:
with open('cifar10_cnn.pb', 'wb') as fid:
    fid.write(graph_def.SerializeToString())