In [34]:

from __future__ import print_function
import import_ipynb
import keras
from keras.layers import Dense, Conv2D, BatchNormalization, Activation
from keras.layers import AveragePooling2D, Input, Flatten
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras.callbacks import ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator
from keras.regularizers import l2
from keras import backend as K
from keras.models import Model

#from models import  resnet_v1, resnet_v2, mobilenets, inception_v3, inception_resnet_v2, densenet
import efficientnet.tfkeras as efn
#from utils import lr_schedule
import numpy as np
import os


In [4]:
# Load the TensorBoard notebook extension

%load_ext tensorboard

In [35]:
# Training parameters
batch_size = 16
epochs = 10
num_classes = 8
num_of_test_samples =4260 


In [36]:
import tensorflow as tf
class SpatialAttentionModule(tf.keras.layers.Layer):
    def __init__(self, kernel_size=3):
       
        super(SpatialAttentionModule, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(64, kernel_size=kernel_size, 
                                            use_bias=False, 
                                            kernel_initializer='he_normal',
                                            strides=1, padding='same', 
                                            activation=tf.nn.relu6)
        self.conv2 = tf.keras.layers.Conv2D(32, kernel_size=kernel_size, 
                                            use_bias=False, 
                                            kernel_initializer='he_normal',
                                            strides=1, padding='same', 
                                            activation=tf.nn.relu6)
        self.conv3 = tf.keras.layers.Conv2D(16, kernel_size=kernel_size, 
                                            use_bias=False, 
                                            kernel_initializer='he_normal',
                                            strides=1, padding='same', 
                                            activation=tf.nn.relu6)
        self.conv4 = tf.keras.layers.Conv2D(1, kernel_size=kernel_size,  
                                            use_bias=False,
                                            kernel_initializer='he_normal',
                                            strides=1, padding='same', 
                                            activation=tf.math.sigmoid)

    def call(self, inputs):
        avg_out = tf.reduce_mean(inputs, axis=3)
        max_out = tf.reduce_max(inputs,  axis=3)
        x = tf.stack([avg_out, max_out], axis=3) 
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        return self.conv4(x)
    
# A custom layer
class ChannelAttentionModule(tf.keras.layers.Layer):
    def __init__(self, ratio=8):
       
        super(ChannelAttentionModule, self).__init__()
        self.ratio = ratio
        self.gapavg = tf.keras.layers.GlobalAveragePooling2D()
        self.gmpmax = tf.keras.layers.GlobalMaxPooling2D()
        
    def build(self, input_shape):
        self.conv1 = tf.keras.layers.Conv2D(input_shape[-1]//self.ratio, 
                                            kernel_size=1, 
                                            strides=1, padding='same',
                                            use_bias=True, activation=tf.nn.relu)
    
        self.conv2 = tf.keras.layers.Conv2D(input_shape[-1], 
                                            kernel_size=1, 
                                            strides=1, padding='same',
                                            use_bias=True, activation=tf.nn.relu)
        super(ChannelAttentionModule, self).build(input_shape)

    def call(self, inputs):
        # compute gap and gmp pooling 
        gapavg = self.gapavg(inputs)
        gmpmax = self.gmpmax(inputs)
        gapavg = tf.keras.layers.Reshape((1, 1, gapavg.shape[1]))(gapavg)   
        gmpmax = tf.keras.layers.Reshape((1, 1, gmpmax.shape[1]))(gmpmax)   
        # forward passing to the respected layers
        gapavg_out = self.conv2(self.conv1(gapavg))
        gmpmax_out = self.conv2(self.conv1(gmpmax))
        return tf.math.sigmoid(gapavg_out + gmpmax_out)
    
    def get_output_shape_for(self, input_shape):
        return self.compute_output_shape(input_shape)

    def compute_output_shape(self, input_shape):
        output_len = input_shape[3]
        return (input_shape[0], output_len)

class AttentionWeightedAverage2D(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        self.init = tf.keras.initializers.get('uniform')
        super(AttentionWeightedAverage2D, self).__init__(** kwargs)

    def build(self, input_shape):
        self.input_spec = [tf.keras.layers.InputSpec(ndim=4)]
        assert len(input_shape) == 4
        self.W = self.add_weight(shape=(input_shape[3], 1),
                                 name='{}_W'.format(self.name),
                                 initializer=self.init)
        self._trainable_weights = [self.W]
        super(AttentionWeightedAverage2D, self).build(input_shape)

    def call(self, x):
        # computes a probability distribution over the timesteps
        # uses 'max trick' for numerical stability
        # reshape is done to avoid issue with Tensorflow
        # and 2-dimensional weights
        logits  =tf.linalg.matmul(x, self.W)
        x_shape = tf.shape(x)
        logits  = tf.reshape(logits, (x_shape[0], x_shape[1], x_shape[2]))
        ai      = tf.exp(logits - tf.reduce_max(logits, axis=[1,2], keepdims=True))
        
        att_weights    = ai / (tf.reduce_sum(ai, axis=[1,2], keepdims=True) + tf.keras.backend.epsilon())
        weighted_input = x * tf.expand_dims(att_weights, axis=-1)
        result         = tf.reduce_sum(weighted_input, axis=[1,2])
        return result

    def get_output_shape_for(self, input_shape):
        return self.compute_output_shape(input_shape)

    def compute_output_shape(self, input_shape):
        output_len = input_shape[3]
        return (input_shape[0], output_len)

In [None]:
from tensorflow.keras.applications import EfficientNetB1


#build the model
effnet = EfficientNetB1(weights='imagenet', 
                        include_top=False, 
                        input_shape=(224, 224, 3))

x = effnet.output
CAN  = ChannelAttentionModule()
SPN = SpatialAttentionModule()
AWG  = AttentionWeightedAverage2D()
canx   = CAN(x)*x
spnx   = SPN(canx)*canx
spny   = SPN(canx)
gapx = tf.keras.layers.GlobalAveragePooling2D()(spnx)
gapy = tf.keras.layers.GlobalAveragePooling2D()(spnx)


avg = tf.keras.layers.Average()([gapx, gapy])

awg = AWG(x)
x = tf.keras.layers.Add()([avg, awg])
x = tf.keras.layers.BatchNormalization()(x)


x = tf.keras.layers.Dense(512, activation = 'relu')(x)
x = tf.keras.layers.Dense(num_classes,activation='softmax')(x)

model = tf.keras.Model(inputs=effnet.input,outputs=x )



model.summary()

In [None]:

# Visualize the model
from tensorflow.keras.utils import plot_model
plot_model(model, to_file='model.png', show_shapes=True)

In [7]:

import time
import datetime
from datetime import datetime


logdir="logs/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")

tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)

In [39]:
#compile the model
#opt = SGD(lr=0.00001, momentum=0.9)
#model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

opt = tf.keras.optimizers.Adam(learning_rate=0.001)

model.compile(loss='categorical_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])

In [None]:
datagen = ImageDataGenerator(validation_split=0.3)

train_generator = datagen.flow_from_directory('dataset', batch_size=batch_size, target_size=(224, 224), subset='training')
validation_generator = datagen.flow_from_directory('dataset', batch_size=batch_size, target_size=(224, 224), shuffle=False, subset='validation')

In [None]:

import time
import datetime
from datetime import datetime
start_time = time.perf_counter()
#steps_per_epoch = 66
#validation_steps = 66

history = model.fit_generator(train_generator,
                              
                              epochs=epochs,
                            
                              validation_data=validation_generator,
                              #callbacks=[tensorboard_callback],
                              verbose=1
                             )

# serialize weights to HDF5
#model.save("saved_cbam_eff0_model")
#print("Saved model to disk")
print(time.perf_counter()-start_time)


In [42]:
#define labels for testing
y_test = validation_generator.classes

In [None]:
#make prediction
yhat_test = np.argmax(model.predict(validation_generator), axis=1)

In [None]:
y_test

In [None]:
yhat_test

In [46]:
import tensorflow as tf
import os
import random
from tqdm import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import random
from IPython.display import Image
import imutils   

from sklearn.metrics import accuracy_score, confusion_matrix, plot_confusion_matrix, classification_report

import keras
import tensorflow.keras as K

import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, ImageDataGenerator, array_to_img, img_to_array
from tensorflow.keras.applications import EfficientNetB1
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Flatten, Dense, Conv2D, Dropout, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import imutils    

In [None]:

last_conv_layer = next(x for x in model.layers[::-1] if isinstance(x, K.layers.Conv2D))
last_conv_layer.name

In [48]:
#https://github.com/gkeechin/vizgradcam/blob/main/gradcam.py

def VizGradCAM(model, image, interpolant=0.5, plot_results=True):

    #sanity check
    assert (interpolant > 0 and interpolant < 1), "Heatmap Interpolation Must Be Between 0 - 1"

    #STEP 1: Preprocesss image and make prediction using our model
    #input image
    original_img = np.asarray(image, dtype = np.float32)
    #expamd dimension and get batch size
    img = np.expand_dims(original_img, axis=0)
    #predict
    prediction = model.predict(img)
    #prediction index
    prediction_idx = np.argmax(prediction)

    #STEP 2: Create new model
    #specify last convolutional layer
    last_conv_layer = next(x for x in model.layers[::-1] if isinstance(x, K.layers.Conv2D))
    target_layer = model.get_layer(last_conv_layer.name)

    #compute gradient of top predicted class
    with tf.GradientTape() as tape:
        #create a model with original model inputs and the last conv_layer as the output
        gradient_model = Model([model.inputs], [target_layer.output, model.output])
        #pass the image through the base model and get the feature map  
        conv2d_out, prediction = gradient_model(img)
        #prediction loss
        loss = prediction[:, prediction_idx]

    #gradient() computes the gradient using operations recorded in context of this tape
    gradients = tape.gradient(loss, conv2d_out)

    #obtain the output from shape [1 x H x W x CHANNEL] -> [H x W x CHANNEL]
    output = conv2d_out[0]

    #obtain depthwise mean
    weights = tf.reduce_mean(gradients[0], axis=(0, 1))


    #create a 7x7 map for aggregation
    activation_map = np.zeros(output.shape[0:2], dtype=np.float32)
    #multiply weight for every layer
    for idx, weight in enumerate(weights):
        activation_map += weight * output[:, :, idx]
    #resize to image size
    activation_map = cv2.resize(activation_map.numpy(), 
                                (original_img.shape[1], 
                                 original_img.shape[0]))
    #ensure no negative number
    activation_map = np.maximum(activation_map, 0)
    #convert class activation map to 0 - 255
    activation_map = (activation_map - activation_map.min()) / (activation_map.max() - activation_map.min())
    #rescale and convert the type to int
    activation_map = np.uint8(255 * activation_map)


    #convert to heatmap
    heatmap = cv2.applyColorMap(activation_map, cv2.COLORMAP_JET)

    #superimpose heatmap onto image
    original_img = np.uint8((original_img - original_img.min()) / (original_img.max() - original_img.min()) * 255)
    cvt_heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
    cvt_heatmap = img_to_array(cvt_heatmap)

    #enlarge plot
    plt.rcParams["figure.dpi"] = 100

    if plot_results == True:
        plt.imshow(np.uint8(original_img * interpolant + cvt_heatmap * (1 - interpolant)))
    else:
        return cvt_heatmap

In [None]:
import cv2
import matplotlib.pyplot as plt

# Read an image
img = cv2.imread('test_gradcam/Picture8.png')
width = 224
height = 224
dim = (width, height)


resized_img = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)

# Display the image
plt.imshow(cv2.cvtColor(resized_img, cv2.COLOR_BGR2RGB))
plt.show()


In [None]:
#apply function
VizGradCAM(model, img_to_array(resized_img), plot_results=True)

In [None]:
import matplotlib.pyplot as plt

f, ax = plt.subplots()
ax.plot([None] + history.history['accuracy'], 'o-')
ax.plot([None] + history.history['val_accuracy'], 'x-')
# Plot legend and use the best location automatically: loc = 0.
ax.legend(['Train acc', 'Validation acc'], loc = 0)
ax.set_title('Training/Validation acc per Epoch')
ax.set_xlabel('Epoch')
ax.set_ylabel('Accuracy')

f, ax = plt.subplots()
ax.plot([None] + history.history['loss'], 'o-')
ax.plot([None] + history.history['val_loss'], 'x-')

# Plot legend and use the best location automatically: loc = 0.
ax.legend(['Train loss', "Val loss"], loc = 1)
ax.set_title('Training/Validation Loss per Epoch')
ax.set_xlabel('Epoch')
ax.set_ylabel('Loss')
plt.show()

In [None]:
import numpy as np
from sklearn.metrics import classification_report 
import matplotlib.pyplot as plt
import seaborn as sns




batch_size=batch_size
from sklearn.metrics import classification_report, confusion_matrix
num_of_test_samples = 4260 
target_names = ["Coughing","Face_Mask","No_mask","Nose_picking","sneezing","spitting","wrong_mask","yawn"] 
#Confution Matrix and Classification Report
Y_pred = model.predict_generator(validation_generator, num_of_test_samples // batch_size+1)
y_pred = np.argmax(Y_pred, axis=1)
#print('Confusion Matrix')
cm = confusion_matrix(validation_generator.classes, y_pred)
#print(cm)
print('Classification Report')
print(classification_report(validation_generator.classes, y_pred, target_names=target_names))
# Normalise
cmn = cm.astype('float') / cm.sum(axis=1)
fig, ax = plt.subplots(figsize=(20,7))

sns.heatmap(cmn, center=0, annot=True, fmt='.2f', linewidths=1,  xticklabels=target_names, yticklabels=target_names)
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show(block=False)
plt.show()

# INFERENCING

In [22]:
reconstructed_model = keras.models.load_model("saved_cbam_eff0_model")

In [None]:
reconstructed_model.summary()

In [24]:
opt = tf.keras.optimizers.Adam(learning_rate=0.001)

reconstructed_model.compile(loss='categorical_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])

In [None]:
#make prediction
yhat_test = np.argmax(reconstructed_model.predict(validation_generator), axis=1)

In [None]:
y_test

In [None]:
yhat_test

In [None]:
import numpy as np
from sklearn.metrics import classification_report 
import matplotlib.pyplot as plt
import seaborn as sns




batch_size=batch_size
from sklearn.metrics import classification_report, confusion_matrix
num_of_test_samples = 4260 
target_names = ["Coughing","Face_Mask","No_mask","Nose_picking","sneezing","spitting","wrong_mask","yawn"] 
#Confution Matrix and Classification Report
Y_pred = reconstructed_model.predict_generator(validation_generator, num_of_test_samples // batch_size+1)
y_pred = np.argmax(Y_pred, axis=1)
#print('Confusion Matrix')
cm = confusion_matrix(validation_generator.classes, y_pred)
#print(cm)
print('Classification Report')
print(classification_report(validation_generator.classes, y_pred, target_names=target_names))
# Normalise
cmn = cm.astype('float') / cm.sum(axis=1)
fig, ax = plt.subplots(figsize=(20,7))

sns.heatmap(cmn, center=0, annot=True, fmt='.2f', linewidths=1,  xticklabels=target_names, yticklabels=target_names)
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show(block=False)
plt.show()

In [None]:
last_conv_layer = next(x for x in reconstructed_model.layers[::-1] if isinstance(x, K.layers.Conv2D))
last_conv_layer.name

In [None]:
#https://github.com/gkeechin/vizgradcam/blob/main/gradcam.py

def VizGradCAM(model, image, interpolant=0.5, plot_results=True):

    """VizGradCAM - Displays GradCAM based on Keras / TensorFlow models
    using the gradients from the last convolutional layer. This function
    should work with all Keras Application listed here:
    https://keras.io/api/applications/
    Parameters:
    model (keras.model): Compiled Model with Weights Loaded
    image: Image to Perform Inference On
    plot_results (boolean): True - Function Plots using PLT
                            False - Returns Heatmap Array
    Returns:
    Heatmap Array?
    """
    #sanity check
    assert (interpolant > 0 and interpolant < 1), "Heatmap Interpolation Must Be Between 0 - 1"

    #STEP 1: Preprocesss image and make prediction using our model
    #input image
    original_img = np.asarray(image, dtype = np.float32)
    #expamd dimension and get batch size
    img = np.expand_dims(original_img, axis=0)
    #predict
    prediction = model.predict(img)
    #prediction index
    prediction_idx = np.argmax(prediction)

    #STEP 2: Create new model
    #specify last convolutional layer
    last_conv_layer = next(x for x in model.layers[::-1] if isinstance(x, K.layers.Conv2D))
    target_layer = model.get_layer(last_conv_layer.name)

    #compute gradient of top predicted class
    with tf.GradientTape() as tape:
        #create a model with original model inputs and the last conv_layer as the output
        gradient_model = Model([model.inputs], [target_layer.output, model.output])
        #pass the image through the base model and get the feature map  
        conv2d_out, prediction = gradient_model(img)
        #prediction loss
        loss = prediction[:, prediction_idx]

    #gradient() computes the gradient using operations recorded in context of this tape
    gradients = tape.gradient(loss, conv2d_out)

    #obtain the output from shape [1 x H x W x CHANNEL] -> [H x W x CHANNEL]
    output = conv2d_out[0]

    #obtain depthwise mean
    weights = tf.reduce_mean(gradients[0], axis=(0, 1))


    #create a 7x7 map for aggregation
    activation_map = np.zeros(output.shape[0:2], dtype=np.float32)
    #multiply weight for every layer
    for idx, weight in enumerate(weights):
        activation_map += weight * output[:, :, idx]
    #resize to image size
    activation_map = cv2.resize(activation_map.numpy(), 
                                (original_img.shape[1], 
                                 original_img.shape[0]))
    #ensure no negative number
    activation_map = np.maximum(activation_map, 0)
    #convert class activation map to 0 - 255
    activation_map = (activation_map - activation_map.min()) / (activation_map.max() - activation_map.min())
    #rescale and convert the type to int
    activation_map = np.uint8(255 * activation_map)


    #convert to heatmap
    heatmap = cv2.applyColorMap(activation_map, cv2.COLORMAP_JET)

    #superimpose heatmap onto image
    original_img = np.uint8((original_img - original_img.min()) / (original_img.max() - original_img.min()) * 255)
    cvt_heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
    cvt_heatmap = img_to_array(cvt_heatmap)

    #enlarge plot
    plt.rcParams["figure.dpi"] = 100

    if plot_results == True:
        plt.imshow(np.uint8(original_img * interpolant + cvt_heatmap * (1 - interpolant)))
    else:
        return cvt_heatmap

In [None]:
import cv2
import matplotlib.pyplot as plt

# Read an image
img = cv2.imread('dataset/Coughing/3.jpg')
width = 224
height = 224
dim = (width, height)


resized_img = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)

# Display the image
plt.imshow(cv2.cvtColor(resized_img, cv2.COLOR_BGR2RGB))
plt.show()

In [None]:
#apply function
VizGradCAM(reconstructed_model, img_to_array(resized_img), plot_results=True)