In [0]:
!pip install larq

In [0]:
import tensorflow as tf
from tensorflow import keras
import larq as lq
import numpy as np
from keras.utils import np_utils


In [0]:
#Get Cifar10 from tf datasets
trainData, testData = tf.keras.datasets.cifar10.load_data()

In [0]:
#Data Augmentation
def resize_and_flip(image, labels, training):
    #Normalize
    image = tf.cast(image, tf.float32) / (255./2.) - 1. 
    if training:
        #Padding
        image = tf.image.resize_with_crop_or_pad(image, 40, 40)
        #crop
        image = tf.image.random_crop(image, [32, 32, 3])
        #flip
        image = tf.image.random_flip_left_right(image)
    return image, labels

In [0]:
#Generate batches of data
def create_dataset(data, batch_size, training):
    images, labels = data
    labels = tf.one_hot(np.squeeze(labels), 10)
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    dataset = dataset.repeat()
    if training:
        dataset = dataset.shuffle(1000)
    dataset = dataset.map(lambda x, y: resize_and_flip(x, y, training))
    dataset = dataset.batch(batch_size)
    return dataset

In [0]:
batch_size = 50

train_dataset = create_dataset(trainData, batch_size, True)
test_dataset = create_dataset(testData, batch_size, False)

In [0]:
#Derived from example at https://larq.dev/

def create_model(model_params):
    model = tf.keras.models.Sequential([
        lq.layers.QuantConv2D(128, 3,
                              kernel_quantizer="ste_sign",
                              kernel_constraint="weight_clip",
                              use_bias=False,
                              input_shape=(32, 32, 3)),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantConv2D(128, 3, padding="same", **model_params),
        tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=(2, 2)),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantConv2D(256, 3, padding="same", **model_params),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantConv2D(256, 3, padding="same", **model_params),
        tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=(2, 2)),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantConv2D(512, 3, padding="same", **model_params),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantConv2D(512, 3, padding="same", **model_params),
        tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=(2, 2)),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),
        tf.keras.layers.Flatten(),

        lq.layers.QuantDense(1024, **model_params),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantDense(1024, **model_params),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),

        lq.layers.QuantDense(10, **model_params),
        tf.keras.layers.BatchNormalization(momentum=0.999, scale=False),
        tf.keras.layers.Activation("softmax")
    ])
    
    return model

In [0]:
model_params = dict(input_quantizer="ste_sign",
              kernel_quantizer= tf.keras.layers.Activation("linear"),
              kernel_constraint= None,
              use_bias=False)

In [0]:
#Return suitable optimizer bop or adam
def optimizer_fn(bop=True):
    if bop:
        initial_lr = 1e-2
    else:
        initial_lr = 1e-3
    threshold_val = 1e-8
    gamma_val = 1e-4
    gamma_decay = 0.1
    decay_step = int((50000 / 50) * 100)
    adam = tf.keras.optimizers.Adam(lr=initial_lr)
    
    if bop: 
        optimizer=lq.optimizers.Bop(fp_optimizer=adam, threshold= threshold_val, gamma=tf.keras.optimizers.schedules.ExponentialDecay(
            gamma_val, decay_step, gamma_decay, staircase=True))
        return optimizer
    
    else:
        return adam
   


In [0]:
def lr_schedule(epoch):
    return 1e-3 * 0.1 ** (epoch // 100)


In [0]:
def train_model(bop=True):
    model = create_model(model_params)
    model.compile(
        optimizer=optimizer_fn(bop),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    logs = keras.callbacks.CSVLogger('cifar.log')
    callbacks = [logs]
    if not bop:
        scheduler = tf.keras.callbacks.LearningRateScheduler(lr_schedule)
        callbacks.append(scheduler)
    
    trained_model = model.fit(
    train_dataset,
    epochs=500,
    steps_per_epoch= trainData[1].shape[0] // batch_size,
    validation_data=test_dataset,
    validation_steps=testData[1].shape[0] // batch_size,
    verbose=1,
    callbacks=callbacks
    )
    if bop:
        model.save('cifar_bop.h5')
    else:
        model.save('cifar_baseline.h5')

    

In [0]:
def test_model(bop=True):
    model = create_model(model_params)
    model.compile(
        optimizer=optimizer_fn(bop),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    if bop:
        model.load_weights('cifar_bop.h5')
    else:
        model.load_weights('cifar_baseline.h5')
    y_new = np_utils.to_categorical(testData[1])
    scores = model.evaluate(testData[0],y_new, verbose=1)
    print("Accuracy of model is {}".format(scores[1]*100.0))

In [0]:
# For baseline set bop to false
train_model(bop=True)
test_model(bop=True)