<a href="https://colab.research.google.com/github/jincy-p-janardhanan/SPP-Pneumonia-Net/blob/ml/SPP_Pneumonia_Net.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Mount google drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Dependencies & Constants

Import dependencies

In [2]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import cv2
import h5py
from tensorflow import keras
from tensorflow.python.client import device_lib
from random import shuffle
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import ImageDataGenerator

In [3]:
!pip install keras-tuner

Collecting keras-tuner
  Downloading keras_tuner-1.0.4-py3-none-any.whl (97 kB)
[?25l[K     |███▍                            | 10 kB 29.5 MB/s eta 0:00:01[K     |██████▊                         | 20 kB 22.0 MB/s eta 0:00:01[K     |██████████                      | 30 kB 16.3 MB/s eta 0:00:01[K     |█████████████▍                  | 40 kB 14.4 MB/s eta 0:00:01[K     |████████████████▊               | 51 kB 5.5 MB/s eta 0:00:01[K     |████████████████████            | 61 kB 6.0 MB/s eta 0:00:01[K     |███████████████████████▍        | 71 kB 5.4 MB/s eta 0:00:01[K     |██████████████████████████▊     | 81 kB 6.1 MB/s eta 0:00:01[K     |██████████████████████████████▏ | 92 kB 6.0 MB/s eta 0:00:01[K     |████████████████████████████████| 97 kB 3.6 MB/s 
Collecting kt-legacy
  Downloading kt_legacy-1.0.4-py3-none-any.whl (9.6 kB)
Installing collected packages: kt-legacy, keras-tuner
Successfully installed keras-tuner-1.0.4 kt-legacy-1.0.4


In [4]:
import keras_tuner as kt

In [5]:
print(device_lib.list_local_devices())
print(tf.__version__)

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 8191562639169356211
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 11345264640
locality {
  bus_id: 1
  links {
  }
}
incarnation: 11543936405937431176
physical_device_desc: "device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7"
]
2.6.0


Image shape parameters

In [6]:
input_height = 128
input_width = 128
n_channels = 1

Class labels

In [7]:
class_no=3
labels = {0:'Normal', 1:'Bacterial', 2:'Viral'}

To save models

In [8]:
model_path = '/content/drive/MyDrive/main_project/models/saved_model_'

# Last saved model number
model_count = 80

# Model Architecture

The hp parameter of the model helps in tuning hyperparameters using keras_tuner. <br>
[Keras Tuner Reference](https://www.tensorflow.org/tutorials/keras/keras_tuner) <br>
[Model Reference](https://github.com/SitiRaihanah/SPP-COVID-Net/blob/master/SPP-COVID-Net.py)

In [None]:
def SPPPneumoniaNet(hp):

    input_images=tf.keras.layers.Input(shape=(input_height,input_width,n_channels))

    x = tf.keras.layers.Conv2D(hp.Int('layer1_units', min_value=4, max_value=8, step=1),
                               (3,3),strides=(1,1),padding='same', use_bias=False)(input_images)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
    x = tf.keras.layers.MaxPooling2D((2,2),strides=(2,2))(x)

    x = tf.keras.layers.Conv2D(hp.Int('layer2_units', min_value=4, max_value=16, step=1),
                               (3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
    x = tf.keras.layers.MaxPooling2D((2,2),strides=(2,2))(x)

    #first triple
    n1 = hp.Int('triple1_units', min_value=32, max_value=64, step=1)
    x = tf.keras.layers.Conv2D(n1,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n1//2,(1,1),strides=(1,1),padding='same',use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n1,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
    x = tf.keras.layers.MaxPooling2D((2,2),strides=(2,2))(x)

    #second triple
    n2 = hp.Int('triple2_units', min_value=64, max_value=128, step=1)
    x = tf.keras.layers.Conv2D(n2,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n2//2,(1,1),strides=(1,1),padding='same',use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n2,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
    x = tf.keras.layers.MaxPooling2D((2,2),strides=(2,2))(x)

    #third triple
    n3 = hp.Int('triple3_units', min_value=128, max_value=256, step=1)
    x = tf.keras.layers.Conv2D(n3,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n3//2,(1,1),strides=(1,1),padding='same',use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n3,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
    x = tf.keras.layers.MaxPooling2D((2,2),strides=(2,2))(x)

    #fourth triple
    n4 = hp.Int('triple4_units', min_value=256, max_value=512, step=32)
    x = tf.keras.layers.Conv2D(n4,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n4//2,(1,1),strides=(1,1),padding='same',use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    x = tf.keras.layers.Conv2D(n4,(3,3),strides=(1,1),padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)

    #ending network
    L1 = tf.keras.layers.MaxPooling2D((4, 4),strides=(1,1),padding='valid')(x)
    L2 = tf.keras.layers.MaxPooling2D((3, 3),strides=(1,1),padding='valid')(x)
    L3 = tf.keras.layers.MaxPooling2D((2, 2),strides=(1,1),padding='valid')(x)

    FL1 = tf.keras.layers.Flatten()(L1)
    FL2 = tf.keras.layers.Flatten()(L2)
    FL3 = tf.keras.layers.Flatten()(L3)
    
    x = tf.keras.layers.Concatenate(axis=1)([FL1,FL2,FL3])
    x = tf.keras.layers.Dense(class_no,activation='softmax')(x)

    # Create model.
    model=tf.keras.models.Model(inputs=input_images,outputs=x)

    model.compile(optimizer=tf.keras.optimizers.Adagrad(
        hp.Float('learning_rate', 1e-4, 1e-1, sampling='log'),
        hp.Float('initial_accumulator_value', 1e-4, 1e-1, sampling='log'),
        hp.Float('epsilon', 1e-07, 1e-04, sampling='log')), 
        loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

# Load Data

Load complete dataset from h5 file to arrays x and y

In [None]:
dest_filepath = '/content/drive/MyDrive/main_project/dataseth5/complete_dataseth5_1.h5'

with h5py.File(dest_filepath, "r") as f:
    x = f["input_data"][:]
    y = f["input_labels"][:]
f.close()
x = np.reshape(x, (x.shape[0], 128, 128, 1))
print('x shape =', x.shape, '| y shape =', y.shape)

## Preview Images

Plot some images from the dataset

In [None]:
plt.figure(figsize=(15,10))
for i in range(3):
    for j in range(5):
        idx = 5*i+j
        label_no = y[idx]
        plt.subplot(3,5,idx+1)
        im = np.squeeze(x[idx])
        plt.imshow(im)
        plt.title(labels[label_no])

## Train, validation & test split

train, validation and test split using h5file and sklearn <br>
[reference](https://www.machinecurve.com/index.php/2020/11/16/how-to-easily-create-a-train-test-split-for-your-machine-learning-model/)

> Shuffling (i.e. randomly drawing) samples is applied as part of the fit. Using a random_state, we can seed the random number generator to make its behavior replicable.

In [None]:
random_state = 55
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.25, random_state=random_state)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=random_state)
print('train size: ', len(y_train), '\t val size: ', len(y_val), '\t test size: ', len(y_test))

## One-hot encoding / Categorical encoding

One hot encoding for y_train and y_test

In [None]:
y_train = tf.keras.utils.to_categorical(y_train, 3)
y_val = tf.keras.utils.to_categorical(y_val, 3)
y_test = tf.keras.utils.to_categorical(y_test, 3)

## Data Augmentation

In [None]:
datagen = ImageDataGenerator(horizontal_flip=True, width_shift_range=0.1, fill_mode='nearest')

# Hyperparameter Tuning Using Keras Tuner

[Reference](https://www.tensorflow.org/tutorials/keras/keras_tuner)

Create a hyperband tuner instance

In [None]:
tuner = kt.Hyperband(SPPPneumoniaNet, objective='val_accuracy', max_epochs=15, hyperband_iterations=3)

Early stopping callback to stop training if val_loss is not improving within 5 epochs.

In [None]:
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

Hyperparameter search using hyperband tuner

In [None]:
tuner.search(datagen.flow(x_train, y_train, batch_size=4), steps_per_epoch=x_train.shape[0],
                      verbose=2, validation_data=(x_val, y_val), callbacks=[stop_early], workers=4)

Retrieve the best hyperparameters obtained

In [None]:
best_hyperparameters = tuner.get_best_hyperparameters(1)[0]

Best hyperparameters obtained 

In [None]:
print('layer1_units: ', best_hyperparameters.get('layer1_units'))
print('layer2_units: ', best_hyperparameters.get('layer2_units'))
print('triple1_units: ', best_hyperparameters.get('triple1_units'))
print('triple2_units: ', best_hyperparameters.get('triple2_units'))
print('triple3_units: ', best_hyperparameters.get('triple3_units'))
print('triple4_units: ', best_hyperparameters.get('triple4_units'))
print('learning_rate: ', best_hyperparameters.get('learning_rate'))
print('epsilon: ', best_hyperparameters.get('epsilon'))
print('initial_accumulator_value: ', best_hyperparameters.get('initial_accumulator_value'))

### Restoring previously obtained best hyperparameters

In [None]:
hps = kt.HyperParameters()

In [None]:
hps.values = {
    'layer1_units':  6,
    'layer2_units':  8,
    'triple1_units': 61, 
    'triple2_units': 123, 
    'triple3_units': 199, 
    'triple4_units': 288, 
    'learning_rate': 0.0018016,
    'initial_accumulator_value': 0.00041422, 
    'epsilon': 6.687e-05
}

Create model with the selected hyperparameters

In [None]:
model = tuner.hypermodel.build(hps)

# Training

### Train Model

Save trained model after each epoch using checkpoint

In [None]:
model_count+=1
path = model_path+str(model_count)
print(path)
checkpoint = tf.keras.callbacks.ModelCheckpoint(path+'/checkpoint_{epoch:02d}', save_freq='epoch') 

In [None]:
epoch_no=100
print('Training')
H=model.fit(datagen.flow(x_train, y_train, batch_size=4), steps_per_epoch=x_train.shape[0],
                      epochs=epoch_no, verbose=2, callbacks=[checkpoint, stop_early], 
                      validation_data=(x_val, y_val), workers=4)

In [None]:
N = np.arange(0, len(H.history["val_loss"]))
plt.style.use("ggplot")
plt.figure()
plt.rcParams["figure.figsize"] = (10,10)
plt.plot(N, H.history["val_loss"], label="val_loss")
plt.plot(N, H.history["val_accuracy"], label="val_acc")
plt.plot(N, H.history["loss"], label="train_loss")
plt.plot(N, H.history["accuracy"], label="train_acc")
plt.title("Training and Validation Metrics")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend()
plt.savefig(path+"/plot.png")
plt.show(block=False)
best = np.argmax(H["val_accuracy"])
output = "Best model is from checkpoint #" + str(best+1)
output += "\nloss:{:.4f} accuracy:{:.4f} val_loss: {:.4f} val_accuracy:{:.4f}".format(
    H.history["loss"][best], H.history["accuracy"][best], H.history["val_loss"][best], H.history["val_accuracy"][best])

### Test Model

In [None]:
print('Testing performance of model #{} with best validation accuracy'.format(best+1))
model = tf.keras.models.load_model(path+'/checkpoint_{:02d}'.format(best))
loss, acc = model.evaluate(x_test, y_test, batch_size=1,verbose=2)
tf.keras.backend.clear_session()
output += "\n\nTest performance of the model at checkpoint #" + str(best+1)
output += "\nloss:{:.4f} accuracy:{:.4f}".format(loss, acc)

Write complete output to a file

In [None]:
output += "\n\nHistory:\n"+str(H.history)
output += "\n\nTest loss:{} Test accuracy:{}".format(loss, acc)

print('writing output to '+ path + '/output.txt\n\n')
print(output)

with open(path+"/output.txt", "w") as f:
  f.write(output)
f.close()

# Prediction

Load saved model with best accuracy

In [9]:
saved_model_dir = '/content/drive/MyDrive/main_project/models/saved_model_65/checkpoint_09'

In [10]:
restored_model = keras.models.load_model(saved_model_dir)

Load and transform image

In [11]:
test_image_path = '/content/drive/MyDrive/main_project/Viral/Viral Pneumonia-10.png'

In [12]:
img = cv2.imread(test_image_path)
img = cv2.resize(img, (input_height, input_width), interpolation=cv2.INTER_CUBIC)
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # To convert colormap from BGR to GRAY
#Normalize the image - convert the each pixel value between 0 and 1
img = img / 255
img = np.reshape(img, (1, input_height, input_width, n_channels))

Predict result

In [13]:
prediction = restored_model.predict(img)
result = labels[np.argmax(prediction)]
print(result)

Viral


# Deployment

In [14]:
converter = tf.lite.TFLiteConverter.from_keras_model(restored_model)
tflite_model = converter.convert()

INFO:tensorflow:Assets written to: /tmp/tmphakbfj0u/assets


In [15]:
with open(
    '/content/drive/MyDrive/main_project/tflite/model_1.0.tflite', 
    'wb') as f:
  f.write(tflite_model)