### Custom Shallow Network - miniResNet with default RMSProp

In [None]:
%load_ext autoreload
%autoreload 2
%load_ext tensorboard

In [None]:
import os
import numpy as np
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator
from tensorflow.keras.models import Model 
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, BatchNormalization
from tensorflow.keras.layers import Input, Activation, Dropout, Flatten, Dense, add
from tensorflow.keras.utils  import to_categorical
from tensorflow.keras.regularizers import l2
import matplotlib.pyplot as plt 

%matplotlib inline

In [None]:
# dimensions of our images.
img_width, img_height = 224, 224

train_data_dir = '/home/sanchit/Documents/Projects/datasets/fire_and_smoke_data/train/'
validation_data_dir = '/home/sanchit/Documents/Projects/datasets/fire_and_smoke_data/val/'
nb_train_samples = 2400
nb_validation_samples = 490
epochs = 100
batch_size = 32

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
class MiniResNet:
    """ mini version of ResNet architecture """
    
    def __init__(self, img_width, img_height):
        self.img_width = img_width
        self.img_height = img_height
        
    def __residual_block(self, x, filters, is_reduce_dim=False, reg=0.0001):
        shortcut = x 
        
        # pre-activation block, act1 goes as an input to conv1
        # first part of the block are 1x1 CONVs with 1/4th of original number of filters
        bn1 = BatchNormalization()(x)
        act1 = Activation("relu")(bn1)
        
        if is_reduce_dim:
            # convolve (1,1) on the input x which stays as a shortcut to incorporate reduction in dim 
            shortcut = Conv2D(filters, (1,1), strides=(2,2), use_bias=False, kernel_initializer='he_normal', 
                              padding="same", kernel_regularizer=l2(reg))(x)
            
            # first convolution on act1 
            conv1 = Conv2D(int(filters*0.25), (1,1), strides=(2,2), use_bias=False, kernel_initializer='he_normal', 
                           padding="same", kernel_regularizer=l2(reg))(act1)
            
        else:
            shortcut = Conv2D(filters, (1,1), use_bias=False, kernel_initializer='he_normal', 
                              padding="same", kernel_regularizer=l2(reg))(x)
            
            conv1 = Conv2D(int(filters*0.25), (1,1), use_bias=False, kernel_initializer='he_normal', 
                           padding="same", kernel_regularizer=l2(reg))(act1) 
        
        # second part of the block are 3x3 CONVs with 1/4th original number of filters
        bn2 = BatchNormalization()(conv1)
        act2 = Activation("relu")(bn2)
        conv2 = Conv2D(int(filters*0.25), (3,3), use_bias=False, kernel_initializer='he_normal', 
                       padding="same", kernel_regularizer=l2(reg))(act2)
        
        # third part of the block are 1x1 CONVs with original number of filters
        bn3 = BatchNormalization()(conv2)
        act3 = Activation("relu")(bn3)
        conv3 = Conv2D(filters, (1,1), use_bias=False, kernel_initializer='he_normal', 
                       padding="same", kernel_regularizer=l2(reg))(act3)
        
        # lastly add the shortcut and last output 
        out = add([shortcut, conv3])
        return out
        
    def build(self):
        
        input_img = Input(shape=(self.img_width, self.img_height, 3), name='input_image')
        
        x = Conv2D(16, kernel_size=(3, 3), strides=1, padding='same', 
                   kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(input_img)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
        
        x = self.__residual_block(x, 16, is_reduce_dim=False) # 224
        x = self.__residual_block(x, 16, is_reduce_dim=True) # 112 
        x = self.__residual_block(x, 16, is_reduce_dim=False) # 112
        
        x = self.__residual_block(x, 32, is_reduce_dim=True) # 56
        x = self.__residual_block(x, 32, is_reduce_dim=False) # 56
        
        x = self.__residual_block(x, 64, is_reduce_dim=True) # 28
        x = self.__residual_block(x, 64, is_reduce_dim=False) # 28
        
        x = self.__residual_block(x, 128, is_reduce_dim=True) # 14
        x = self.__residual_block(x, 128, is_reduce_dim=False) # 14
        
        x = self.__residual_block(x, 256, is_reduce_dim=True) # 7
        x = self.__residual_block(x, 256, is_reduce_dim=False) # 7
              
        x = BatchNormalization()(x)
        x = Activation("relu", name="last_conv_relu")(x)
        x = GlobalAveragePooling2D()(x)
            
        # classifier 
        x = Flatten()(x)
        x = Dense(64, kernel_regularizer=l2(1e-4))(x)
        x = Activation("relu", name="last_relu")(x)
        x = Dense(1)(x)
        output = Activation("sigmoid")(x)
        
        model = Model(inputs=input_img, outputs=output)
        return model

In [None]:
miniResNet = MiniResNet(img_width, img_height)
model = miniResNet.build()

model.compile(loss='binary_crossentropy',
              optimizer='rmsprop',
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
def load_data(dir_path="", class_mode='binary', classes=None):
    """
    loads all the images for all the classes (sub-dirs) provided in the input directory.
    :param dir_path -  input directory which contains for each class a sub-directory. 
    :param class_mode - binary (for usage in binary_crossentropy loss) and categorical (categorical_crossentropy loss)
    :params classes - a list of classes'names 
    """
    X = []
    y = []
    data_kind = dir_path.split("/")[-2] # either training or validation or testing
    directories=[d for d in os.listdir(dir_path) if os.path.isdir(d) or (not d.startswith("."))]
    
    for label, class_name in enumerate(directories):
        print(f"loading {data_kind} data for class: {class_name}")
        class_dir = os.path.join(dir_path, class_name)
        for img_path in tqdm(glob(class_dir + '/*.jpg')):
            img = load_img(img_path, target_size=(img_width, img_height))
            img = img_to_array(img)

            # save the image and its corresponding label
            X.append(img)
            y.append(label)
                
    X = np.asarray(X, dtype=np.float32) / 255.0
    y = np.asarray(y)
    
    if class_mode == "categorical":
        y = to_categorical(y_train, num_classes=classes)
        
    print(f"total number of images loaded: {X.shape[0]} and of shape: {X.shape[1:]}")
    
    return X, y

In [None]:
X_train, y_train = load_data(dir_path=train_data_dir)
X_val, y_val = load_data(dir_path=validation_data_dir)

In [None]:
def get_random_eraser(p=0.5, s_l=0.02, s_h=0.4, r_1=0.3, r_2=1/0.3, v_l=0, v_h=255, pixel_level=False):
    def eraser(input_img):
        img_h, img_w, img_c = input_img.shape
        p_1 = np.random.rand()

        if p_1 > p:
            return input_img

        while True:
            s = np.random.uniform(s_l, s_h) * img_h * img_w
            r = np.random.uniform(r_1, r_2)
            w = int(np.sqrt(s / r))
            h = int(np.sqrt(s * r))
            left = np.random.randint(0, img_w)
            top = np.random.randint(0, img_h)

            if left + w <= img_w and top + h <= img_h:
                break

        if pixel_level:
            c = np.random.uniform(v_l, v_h, (h, w, img_c))
        else:
            c = np.random.uniform(v_l, v_h)

        input_img[top:top + h, left:left + w, :] = c

        return input_img

    return eraser

In [None]:
# create a train generator
train_datagen = ImageDataGenerator(
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True, 
    vertical_flip=True,
    rotation_range=30,
    fill_mode="wrap",
    height_shift_range=0.15,
    width_shift_range=0.15)

# preprocessing_function=get_random_eraser(v_l=0, v_h=1)

# create a test/val generator
test_datagen = ImageDataGenerator()

In [None]:
train_generator = train_datagen.flow(X_train, y_train, batch_size=batch_size, shuffle=True)

validation_generator = test_datagen.flow(X_val, y_val, batch_size=batch_size, shuffle=False)

In [None]:
H = model.fit_generator(
    train_generator,
    steps_per_epoch=nb_train_samples // batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps=nb_validation_samples // batch_size, 
    workers = 8)

In [None]:
N = np.arange(0, epochs)
plt.style.use("ggplot")
plt.figure()
plt.plot(N, H.history["loss"], label="train_loss")
plt.plot(N, H.history["val_loss"], label="val_loss")
plt.plot(N, H.history["accuracy"], label="train_acc")
plt.plot(N, H.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend()
plt.show()