In [1]:
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import pandas as pd
import tensorflow_datasets as tfds
import seaborn as sns
import matplotlib.pyplot as plt
# import albumentations as A
import cv2 as cv
import io
import wandb
import os







In [37]:
from tensorflow.keras.layers import Normalization, Dense, InputLayer,Conv2D, MaxPool2D, Flatten, BatchNormalization,Input,Layer, Dropout, RandomFlip, RandomRotation, Resizing, Rescaling
from tensorflow.keras.losses import MeanSquaredError, Huber, MeanAbsoluteError, BinaryCrossentropy
from tensorflow.keras.metrics import RootMeanSquaredError,binary_accuracy, BinaryAccuracy, FalseNegatives, FalsePositives, TruePositives, TrueNegatives, Precision, Recall, AUC
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import Callback, CSVLogger, EarlyStopping, LearningRateScheduler, ModelCheckpoint
from tensorflow.keras.regularizers import L2
from tensorflow.keras import Sequential
from sklearn.metrics import confusion_matrix, roc_curve
from tensorboard.plugins.hparams import api as hp
from wandb.integration.keras import WandbCallback, WandbMetricsLogger

Wandb Install, Login and Initialization

In [33]:
# CANNOT RUN =))
wandb.login()
wandb.require("core")
run = wandb.init(project='CNN_Malaria.ipynb',entity='khangqn1212-learn')

In [25]:
wandb.config = {
  "LEARNING_RATE": 0.01,
  "N_EPOCHS": 1,
  "BATCH_SIZE": 128,
  "DROPOUT_RATE": 0.0,
  "IM_SIZE": 224,
  "REGULARIZATION_RATE": 0.0,
  "N_FILTERS": 6,
  "KERNEL_SIZE": 3,
  "N_STRIDES": 1,
  "POOL_SIZE": 2,
  "N_DENSE_1": 100,
  "N_DENSE_2": 10,
}
CONFIGURATION = wandb.config

DATA PREPROCESSING

In [5]:
dataset, dataset_info = tfds.load('malaria', with_info=True, as_supervised=True, shuffle_files=True, split=['train'])

DATA AUGMENTATION

In [None]:
# tf.imgae.adjust_brightness
# tf.imgae.adjust_contrast
# tf.imgae.adjust_gamma
# tf.imgae.adjust_saturation
# tf.imgae.flip_left_right
# tf.imgae.flip_up_down
# tf.imgae.rot90

##### INSTEAD OF AUGMENT IMAGE MANUALLY WE CAN USE KERAS LAYER AND PUT IT IN THE MODEL

# tf.keras.layers.RandomContrast
# tf.keras.layers.RandomCrop
# tf.keras.layers.RandomFlip
# tf.keras.layers.RandomRotation
# tf.keras.layers.RandomTranslation
# tf.keras.layers.RandomZoom

In [None]:
def visualize(original, augmented):
    plt.subplot(1,2,1)
    plt.imshow(original)

    plt.subplot(1,2,2)
    plt.imshow(augmented)


In [6]:
#@tf.function : convert function from eager mode to graph mode -> better performance
def splits(dataset, TRAIN_RATIO, VAL_RATIO, TEST_RATIO):
  dataset_size = len(dataset)

  train_dataset = dataset.take(int(TRAIN_RATIO * dataset_size))

  val_test_dataset = dataset.skip(int(TRAIN_RATIO * dataset_size))
  val_dataset = val_test_dataset.take(int(VAL_RATIO * dataset_size))

  test_dataset = val_test_dataset.skip(int(VAL_RATIO * dataset_size))
  return train_dataset, val_dataset, test_dataset

@tf.function
def resize_rescale(image, label):
  image = tf.image.resize(image, (224,224))/255.0
  return image, label

In [7]:
TRAIN_RATIO = 0.8
VAL_RATIO = 0.1
TEST_RATIO = 0.1


train_dataset , val_dataset, test_dataset = splits(dataset[0], TRAIN_RATIO, VAL_RATIO, TEST_RATIO)

In [None]:
original_image, label = next(iter(train_dataset))
# augmented_image = tf.image.flip_left_right(original_image)
# augmented_image = tf.image.random_flip_up_down(original_image)
# augmented_image = tf.image.rot90(original_image)
# augmented_image = tf.image.adjust_brightness(original_image, 0.8)
# augmented_image = tf.image.random_saturation(original_image, lower = 0, upper = 1)
augmented_image = tf.image.central_crop(original_image, 0.8)

visualize(original_image, augmented_image)


AUGMENTATION FUNCTION

In [None]:
### tf.image augment
@tf.function
def augment(image, label):
    image, label = resize_rescale(image, label)
    image = tf.image.rot90(image, k= tf.random.uniform(shape=[], minval=0,maxval=2,dtype=tf.int32))
    # k = probability
    # image = tf.image.adjust_saturation(image, 0.3) -> may lead to poor performance
    image = tf.image.flip_left_right(image)
    return image, label

AUGMENTATION LAYER

In [9]:
### tf.keras.layer augment
resize_rescale_layers = tf.keras.Sequential([
    Resizing(224,224),
    Rescaling(1.0/255)
])

augment_layers = tf.keras.Sequential([
    RandomRotation(factor = (0.25, 0.2501),),
    RandomFlip(mode = "horizontal",)

])

@tf.function
def augment_layer(image, labels):
    return augment_layers(resize_rescale_layers(image), training = True) , labels

MIXUP DATA AUGMENTATION

In [None]:
train_dataset_1 = train_dataset.shuffle(buffer_size=4096).map(resize_rescale)
train_dataset_2 = train_dataset.shuffle(buffer_size=4096).map(resize_rescale)

mixed_dataset = tf.data.Dataset.zip((train_dataset_1,train_dataset_2))

In [None]:
@tf.function
def mixup(train_dataset_1, train_dataset_2):
    (image_1, label_1) ,(image_2,label_2) = train_dataset_1, train_dataset_2
    
    lamda = tfp.distributions.Beta(0.2, 0.2)
    lamda = lamda.sample(1)[0]

    label_1 = tf.cast(label_1,dtype= tf.dtypes.float32)
    label_2 = tf.cast(label_2,dtype= tf.dtypes.float32)
    image = lamda*image_1 + (1-lamda)*image_2
    label = lamda*label_1 + (1-lamda)*label_2
    return image, label

CUTMIX AUGMENTATION

In [None]:
IM_SIZE = 224
@tf.function
def box(lamda):
    r_x = tf.cast(tfp.distributions.Uniform(0, IM_SIZE ).sample(1)[0], dtype= tf.int32)
    r_y = tf.cast(tfp.distributions.Uniform(0, IM_SIZE ).sample(1)[0], dtype= tf.int32)

    r_w = tf.cast(IM_SIZE*tf.math.sqrt(1-lamda), dtype = tf.int32)
    r_h = tf.cast(IM_SIZE*tf.math.sqrt(1-lamda), dtype = tf.int32)

    r_x = tf.clip_by_value(r_x - r_w//2, 0, IM_SIZE)
    r_y = tf.clip_by_value(r_y - r_h//2, 0, IM_SIZE)

    x_b_r = tf.clip_by_value(r_x + r_w//2, 0, IM_SIZE)
    y_b_r = tf.clip_by_value(r_y + r_h//2, 0, IM_SIZE)

    r_w = x_b_r - r_x
    if(r_w == 0):
        r_w = 1
    r_h = y_b_r - r_y
    if(r_h ==0):
        r_h = 1
    return r_y, r_x, r_h, r_w

In [None]:
@tf.function
def cutmixup(train_dataset_1, train_dataset_2):
    (image_1, label_1) ,(image_2,label_2) = train_dataset_1, train_dataset_2
    
    lamda = tfp.distributions.Beta(0.2,0.2)
    lamda = lamda.sample(1)[0]

    r_y, r_x , r_h, r_w = box(lamda)
    cut_cat = tf.image.crop_to_bounding_box(image_1, r_y, r_x, r_h, r_w)
    cut_dog = tf.image.crop_to_bounding_box(image_2,  r_y, r_x, r_h, r_w)
    pad_dog = tf.image.pad_to_bounding_box(cut_dog,  r_y, r_x, IM_SIZE,IM_SIZE)
    pad_cat = tf.image.pad_to_bounding_box(cut_cat,  r_y, r_x, IM_SIZE,IM_SIZE)
    image = image_2 - pad_dog + pad_cat
    lamda = tf.cast(1- (r_w*r_h)/(IM_SIZE*IM_SIZE), dtype = tf.float32)
    label = lamda*tf.cast(label_1, dtype = tf.float32) + (1-lamda)*tf.cast(label_2, dtype = tf.float32)
    return image, label

In [None]:
# ### WORK FOR OTHER PROBLEM NOT THIS ONE SO IT LEAD TO POOR PERFORMANCE
# train_dataset = mixed_dataset.shuffle(
#     buffer_size=8,
#     reshuffle_each_iteration=True).map(mixup).batch(32).prefetch(tf.data.AUTOTUNE)
# train_dataset = mixed_dataset.shuffle(
#     buffer_size=8,
#     reshuffle_each_iteration=True).map(cutmixup).batch(32).prefetch(tf.data.AUTOTUNE)


In [None]:
original ,label = next(iter(train_dataset))
print(label)
plt.imshow(original[0])

AUGMENTATION WITH ALBUMENTATION

In [None]:
transforms = A.Compose(
    [
      A.Resize(IM_SIZE, IM_SIZE),

      A.OneOf([A.HorizontalFlip(),
                A.VerticalFlip(),], p = 0.3),
      
      A.RandomRotate90(),   
      A.RandomBrightnessContrast(brightness_limit=0.2,
                                contrast_limit=0.2,
                                always_apply=False, p=0.5),
      A.Sharpen(alpha=(0.2, 0.5), lightness=(0.5, 1.0), always_apply=False, p=0.5),
])

In [None]:
@tf.function
def aug_albument(image):
    data = {'image':image}
    image = transforms(**data)
    image = image["image"]
    image = tf.cast(image/255, tf.float32)
    return image

In [None]:
@tf.function
def process_data(image, label):
    aug_img = tf.numpy.function(func = aug_albument, inp=[image], Tout= tf.float32)
    return aug_img, label

In [None]:
train_dataset = train_dataset.shuffle(
    buffer_size=8,
    reshuffle_each_iteration=True).map(process_data).batch(32).prefetch(tf.data.AUTOTUNE)

PREPARE TRAIN AND VAL DATASET

In [10]:
train_dataset = train_dataset.shuffle(
    buffer_size=8,
    reshuffle_each_iteration=True).map(augment_layer).batch(32).prefetch(tf.data.AUTOTUNE)

val_dataset = val_dataset.shuffle(
    buffer_size=8,
    reshuffle_each_iteration=True).map(resize_rescale).batch(32).prefetch(tf.data.AUTOTUNE)

test_dataset = test_dataset.map(resize_rescale)

# instead of doing augmentation using map as above
# we can embedd the aumgnet_layer into the model itself
# then we can remove the map(augmnet_layer, resize_rescale) above
# remember to do this for test_dataset also (cause the layer already in the model)
# and one important thing if we use keras.layer augment we use batch_size = 1 -> .batch(1) not 32
# and an extra thing we can add probability that if the image is augmented at some random epochs or not

SEQUENTIAL API (+ DROPOUT LAYER & REGULARIZATION)

In [None]:
# add dropout
# add regularizer (in conv2D layer, dense layer) -> kernel_regularizer
# add augmentation layer
IM_SIZE = CONFIGURATION['IM_SIZE']
DROPOUT_RATE = CONFIGURATION['DROPOUT_RATE']
REGULARIZATION_RATE = CONFIGURATION['REGULARIZATION_RATE']
N_FILTERS = CONFIGURATION['N_FILTERS']
KERNEL_SIZE = CONFIGURATION['KERNEL_SIZE']
POOL_SIZE = CONFIGURATION['POOL_SIZE']
N_STRIDES = CONFIGURATION['N_STRIDES']
N_DENSE_1 = CONFIGURATION['N_DENSE_1']
N_DENSE_2 = CONFIGURATION['N_DENSE_2']

lenet_model = Sequential([
    InputLayer(input_shape = (224,224,3)),
    
    # resize_rescale_layers,
    # augment_layers,

    tf.keras.layers.Conv2D(filters = 6, kernel_size=3, strides=1, padding="valid",activation ='relu',kernel_regularizer=L2(0.01)),
    BatchNormalization(),
    tf.keras.layers.MaxPool2D(pool_size=2,strides=2),
    Dropout(rate = 0.3),

    tf.keras.layers.Conv2D(filters = 16, kernel_size=3, strides=1, padding="valid",activation ='relu',kernel_regularizer=L2(0.01)),
    BatchNormalization(),
    tf.keras.layers.MaxPool2D(pool_size=2,strides=2),

    Flatten(),

    Dense(100, activation ="relu",kernel_regularizer=L2(0.01)),
    BatchNormalization(),
    Dropout(rate = 0.3),
    Dense(10, activation ="relu",kernel_regularizer=L2(0.01)),
    BatchNormalization(),
    Dense(1, activation ="sigmoid")])
lenet_model.summary()

CUSTOM LOSS METHOD (WITHOUT PARAMETERS)

In [None]:
@tf.function
def custom_bce(y_true, y_pred):
  bce = BinaryCrossentropy()
  return bce(y_true, y_pred)

CUSTOM LOSS METHOD (WITH PARAMETERS)

In [None]:
FACTOR = 1
@tf.function
def custom_bce(FACTOR):
    def loss(y_true, y_pred):
        bce = BinaryCrossentropy()
        return bce(y_true, y_pred)*FACTOR
    return loss

CUSTOM LOSS CLASS

In [None]:
FACTOR = 1
class CustomBCE(tf.keras.losses.Loss):
    def __init__(self, FACTOR):
        super(CustomBCE,self).__init__()
        self.FACTOR = FACTOR
    def call(self, y_true, y_pred):
        bce = BinaryCrossentropy()
        return bce(y_true, y_pred)*self.FACTOR

CUSTOM METRIC METHOD (WITHOUT PARAMETERS)

In [None]:
@tf.function
def custom_accuracy(y_true, y_pred):
    return binary_accuracy(y_true, y_pred)

CUSTOM METRIC METHOD (with PARAMETERS)

In [None]:
FACTOR = 1
@tf.function
def custom_accuracy(FACTOR):
    def metric(y_true, y_pred):
        return binary_accuracy(y_true, y_pred)* FACTOR
    return metric

CUSTOM METRIC CLASS

In [None]:
FACTOR = 1
class CustomAccuracy(tf.keras.metrics.Metric):
    def __init__(self, name = "Custom_Accuracy" ,FACTOR = 1):
        super(CustomAccuracy,self).__init__()
        self.FACTOR = FACTOR
        self.accuracy = self.add_weight(name= name, initializer= 'zeros')
    
    def update_state(self, y_true, y_pred, sample_weight = None):
        self.accuracy = binary_accuracy(tf.cast(y_true,dtype = tf.float32), y_pred)* self.FACTOR
    def result(self):
        return self.accuracy
    def reset(self):
        self.accuracy.assign(0.)


In [None]:
lenet_model.compile(optimizer = Adam(learning_rate=0.01),
            loss = CustomBCE(FACTOR),
            metrics= [CustomAccuracy()]
)

In [None]:
history = lenet_model.fit(train_dataset, validation_data = val_dataset, epochs = 5, verbose = 1)

CUSTOM TRAINING LOOP

In [None]:
CUSTOMTRAIN_DIR = './logs/custom/train'
CUSTOMVAL_DIR = './logs/custom/val'

custom_train_writer = tf.summary.create_file_writer(CUSTOMTRAIN_DIR)
custom_val_writer = tf.summary.create_file_writer(CUSTOMVAL_DIR)

In [None]:
OPTIMIZER = Adam(learning_rate=0.01)
EPOCHS = range(3)
METRIC = BinaryAccuracy()
METRIC_VAL = BinaryAccuracy()


In [None]:
@tf.function
def training_block(x_batch, y_batch):
    with tf.GradientTape() as recorder:
        y_pred = lenet_model(x_batch, training = True)
        loss = custom_bce(y_batch, y_pred)
        
    partial_derivatives = recorder.gradient(loss, lenet_model.trainable_weights)
    OPTIMIZER.apply_gradients(zip(partial_derivatives, lenet_model.trainable_weights))
    METRIC.update_state(y_batch, y_pred)
    return loss

@tf.function
def val_block(x_batch_val, y_batch_val):
    y_pred_val = lenet_model(x_batch_val, training = False)
    loss_val = custom_bce(y_batch_val, y_pred_val)

    METRIC_VAL.update_state(y_batch_val, y_pred_val)
    return loss_val

In [None]:
def neuralearn(model,loss_fucntion,METRIC, METRIC_VAL,OPTIMIZER,train_dataset, val_dataset, EPOCHS):
    for epoch in EPOCHS:
        print("Training starts for epoch number {}".format(epoch))
        for step, (x_batch , y_batch) in enumerate(train_dataset):
            loss = training_block(x_batch, y_batch)

        print("Training loss", loss)
        print("The accuracy is", METRIC.result())
        with custom_train_writer.as_default():
            tf.summary.scalar('Training Loss',data = loss, step = epoch)
        with custom_train_writer.as_default():
            tf.summary.scalar('Training Accuracy',data = METRIC.result(), step = epoch)

        METRIC.reset_state()
        for (x_batch_val, y_batch_val) in val_dataset:
            loss_val = val_block(x_batch_val, y_batch_val)
        print("Validation loss {}".format(loss_val))
        print("The accuracy is", METRIC_VAL.result())

        with custom_val_writer.as_default():
            tf.summary.scalar('Val Loss',data = loss_val, step = epoch)
        with custom_val_writer.as_default():
            tf.summary.scalar('Val Accuracy',data = METRIC_VAL.result(), step = epoch)
        METRIC_VAL.reset_state()


In [None]:
neuralearn(lenet_model,custom_bce, METRIC, METRIC_VAL,OPTIMIZER, train_dataset, val_dataset,EPOCHS)

In [None]:
# test_dataset =test_dataset.batch(1)
lenet_model.evaluate(test_dataset)

In [None]:
lenet_model.save('C:/Users/Admin/Desktop/Tensor/lenet.h5')

FUNCTIONAL API

In [None]:
func_input = Input(shape = (224,224,3), name='input image')

x = tf.keras.layers.Conv2D(filters = 6, kernel_size=3, strides=1, padding="valid",activation ='relu')(func_input)
x = BatchNormalization()(x)
x = tf.keras.layers.MaxPool2D(pool_size=2,strides=2)(x)

x = tf.keras.layers.Conv2D(filters = 16, kernel_size=3, strides=1, padding="valid",activation ='relu')(x)
x = BatchNormalization()(x)
output = tf.keras.layers.MaxPool2D(pool_size=2,strides=2)(x)


In [None]:
feature_extract_model = Model(func_input, output,name='Feature_Extractor')
feature_extract_model.summary()

In [None]:
func_input = Input(shape = (224,224,3), name='input image')

x = feature_extract_model(func_input)

x = Flatten()(x)

x = Dense(100, activation ="relu")(x)
x = BatchNormalization()(x)
x = Dense(10, activation ="relu")(x)
x = BatchNormalization()(x)
func_output = Dense(1, activation ="sigmoid")(x)

lenet_model1 = Model(func_input, func_output, name= 'Lenet_Model')
lenet_model1.summary()

In [None]:
lenet_model1.compile(optimizer = Adam(learning_rate=0.01)
            loss = BinaryCrossentropy(),
            metrics= ['accuracy']
)
history = lenet_model1.fit(train_dataset, validation_data = val_dataset, epochs = 5, verbose = 1)

MODEL SUBCLASSING

In [None]:
class FeatureExtractor(Layer):
    def __init__(self, filters, kernel_size, strides, padding, activation, pool_size,):
        super(FeatureExtractor, self).__init__()

        self.conv_1 = Conv2D(filters = 8, kernel_size = 3, strides = 1, padding = 'valid', activation = 'relu')
        self.batch_1 = BatchNormalization()
        self.pool_1 = MaxPool2D (pool_size = 2, strides= 2)

        self.conv_2 = Conv2D(filters = 16, kernel_size = 3, strides = 1, padding = 'valid', activation = 'relu')
        self.batch_2 = BatchNormalization()
        self.pool_2 = MaxPool2D (pool_size = 2, strides= 2)
    def call(self, x):
        x = self.conv_1(x)
        x = self.batch_1(x)
        x = self.pool_1(x)

        x = self.conv_2(x)
        x = self.batch_2(x)
        x = self.pool_2(x)

        return x
    
func_input = tf.keras.Input(shape = (224,224,3), name='input image')

feature_sub_classed = FeatureExtractor(8,3,1,"valid","relu",2)

x = feature_sub_classed(func_input)

x = Flatten()(x)

x = Dense(100, activation ="relu")(x)
x = BatchNormalization()(x)
x = Dense(10, activation ="relu")(x)
x = BatchNormalization()(x)
func_output = Dense(1, activation ="sigmoid")(x)

lenet_subclass_model = Model(func_input, func_output, name= 'Lenet_Model2')
lenet_subclass_model.summary()

In [None]:
lenet_subclass_model.compile(optimizer = Adam(learning_rate=0.01),
            loss = BinaryCrossentropy(),
            metrics= ['accuracy']
)
history = lenet_subclass_model.fit(train_dataset, validation_data = val_dataset, epochs = 5, verbose = 1)

In [None]:
class LenetModel(Model):
    def __init__(self):
        super(LenetModel,self).__init__()
        self.feature_extractor = FeatureExtractor(8, 3, 1, "valid", "relu", 2)

        self.flatten = Flatten()

        self.dense_1 = Dense(100, activation = "relu")
        self.batch_1 = BatchNormalization()

        self.dense_2 = Dense(10, activation = "relu")
        self.batch_2 = BatchNormalization()

        self.dense_3 = Dense(1, activation = "sigmoid")

    def call(self, x):
        x = self.feature_extractor(x)
        x = self.flatten(x)
        x = self.dense_1(x)
        x = self.batch_1(x)
        x = self.dense_2(x)
        x = self.batch_2(x)
        x = self.dense_3(x)
       
        return x
lenet_sub_classed = LenetModel()
lenet_sub_classed(tf.random.normal(
    [1,224,224,3],
    mean=0.0,
    stddev=0.5,
    dtype=tf.dtypes.float32,
))
# lenet_sub_classed.summary()


In [None]:
lenet_sub_classed.compile(optimizer = Adam(learning_rate=0.01),
            loss = BinaryCrossentropy(),
            metrics= ['accuracy']
)

In [None]:
lenet_sub_classed.fit(train_dataset, validation_data = val_dataset, epochs = 5, verbose = 1)

CUSTOM LAYER


In [None]:
class NeuralearnDense(Layer):
    def __init__(self, output_units, activation):
        super(NeuralearnDense, self).__init__()
        self.outout_units = output_units
        self.activation = activation

    def build(self, input_features_shape):
        self.w = self.add_weight(shape = (input_features_shape[-1], self.outout_units), initializer = "random_normal", trainable = True) 
        self.b = self.add_weight(shape= (self.outout_units,), initializer = "random_normal", trainable = True)
    
    def call(self, input_features):

        pre_output = tf.matmul(input_features,self.w) + self.b
        if(self.activation == "relu"):
            return tf.nn.relu(pre_output)
        elif (self.activation == "sigmoid"):
            return tf.math.sigmoid(pre_output)
        else:
            return pre_output

In [None]:
lenet_custom_model = tf.keras.Sequential([
    InputLayer(input_shape = (224,224,3)),
    tf.keras.layers.Conv2D(filters = 6, kernel_size=3, strides=1, padding="valid",activation ='relu'),
    BatchNormalization(),
    tf.keras.layers.MaxPool2D(pool_size=2,strides=2),

    tf.keras.layers.Conv2D(filters = 16, kernel_size=3, strides=1, padding="valid",activation ='relu'),
    BatchNormalization(),
    tf.keras.layers.MaxPool2D(pool_size=2,strides=2),

    Flatten(),

    NeuralearnDense(100, activation ="relu"),
    BatchNormalization(),
    NeuralearnDense(10, activation ="relu"),
    BatchNormalization(),
    NeuralearnDense(1, activation ="sigmoid")])
lenet_custom_model.summary()

In [None]:
lenet_custom_model.compile(optimizer = Adam(learning_rate=0.01),
            loss = BinaryCrossentropy(),
            metrics= ['accuracy']
)

In [None]:
lenet_custom_model.fit(train_dataset, validation_data = val_dataset, epochs = 5, verbose = 1)

EVALUATING

In [28]:
metrics = [TruePositives(name='tp'), FalsePositives(name='fp'), TrueNegatives(name='tn') ,FalseNegatives(name='fn'),BinaryAccuracy(name='accuracy')
           ,Precision(name='precision'), Recall(name = 'recall'), AUC(name='auc')]

In [None]:
abc = tf.keras.models.load_model('lenet.h5')
abc.compile(optimizer = Adam(learning_rate=0.01),
            loss = BinaryCrossentropy(),
            metrics= metrics
)

In [None]:
test_dataset = test_dataset.batch(1)

In [None]:
abc.evaluate(test_dataset)

VISUALIZING CONFUSION MATRIX

In [None]:
labels = []
inp = []

for x,y in test_dataset.as_numpy_iterator():
    inp.append(x)
    labels.append(y)


In [None]:
print(np.array(inp).shape)
print(np.array(inp)[:,0,...].shape)

In [None]:
labels = np.array([i[0] for i in labels])
print(labels)

In [None]:
predicted = abc.predict(np.array(inp)[:,0,...])
print(predicted[:,0])

In [None]:
threshold = 0.999998945
cm = confusion_matrix(labels, predicted > threshold)
print(cm)
plt.figure(figsize=(4,4))

sns.heatmap(cm, annot= True)
plt.title('Confusion_Matrix - {}'.format(threshold))
plt.ylabel('Actual')
plt.xlabel('Predicted')

ROC PLOT

In [None]:
fp, tp, thresholds = roc_curve(labels, predicted)
# print(len(fp), len(tp), len(thresholds))

plt.figure(figsize=(10,10))
plt.plot(fp, tp)
plt.xlabel('False_positive')
plt.ylabel('True_positives')

plt.grid()

skip = 20
for i in range(0, len(thresholds), skip):
    plt.text(fp[i], tp[i], thresholds[i])
plt.show

CALL BACK

In [None]:
class LossCallback(Callback):
    def on_epoch_end(self,epoch, logs):
        print(" \n Epoch Number {} the model has a loss {}".format(epoch, logs['loss']))
    def on_batch_end(self,batch, logs):
        print(" \n Batch Number {} the model has a loss {}".format(batch+1, logs))


In [None]:
csv_callback = CSVLogger(
    'logs.csv', separator=',', append = False
)

EARLY STOPPING

In [None]:
es_callback = EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=0,
    verbose=1,
    mode='auto',
    baseline=None,
    restore_best_weights=False
)

LEARNINGRATESCHEDULER

In [None]:
def scheduler(epoch , lr):
    if epoch < 10:
        learning_rate = lr
    else:
        learning_rate = lr * tf.math.exp(-0.1)
    with train_writer.as_default():
        tf.summary.scalar('Learning Rate',data = learning_rate, step = epoch)
    return learning_rate
lr_scheduler_callback = LearningRateScheduler(
    schedule = scheduler,
    verbose = 1
)

ModelCheckpointing

In [None]:
checkpoint_callback = ModelCheckpoint(
    'checkpoints.keras', monitor = 'val_loss', verbose = 1,
    save_best_only = True,
    save_weights_only = False,
    mode = "auto",
    save_freq = 'epoch'
)

ReduceLROnPlateau

In [None]:
reducelr_callback= tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_accuracy', factor = 0.1, patience=2,
    verbose=1
)

TENSOR BOARD INTEGRATION

In [None]:
METRIC_DIR = './logs/metrics'
train_writer = tf.summary.create_file_writer(METRIC_DIR)

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir = './logs', histogram_freq= 1, profile_batch='100,132')

In [None]:
class LogImageCallback(Callback):
    def on_epoch_end(self, epoch, logs):
        labels = []
        inp = []

        for x,y in test_dataset.as_numpy_iterator():
            inp.append(x)
            labels.append(y)
        labels = np.array([i[0] for i in labels])
        predicted = abc.predict(np.array(inp)[:,0,...])
        threshold = 0.999998945
        cm = confusion_matrix(labels, predicted > threshold)
        print(cm)
        plt.figure(figsize=(4,4))

        sns.heatmap(cm, annot= True)
        plt.title('Confusion_Matrix - {}'.format(threshold))
        plt.ylabel('Actual')
        plt.xlabel('Predicted')

        buffer = io.BytesIO()
        plt.savefig(buffer, format = 'png')

        image = tf.image.decode_png(buffer.getvalue(), channels=3)

        IMAGE_DIR = './logs/images'
        image_writer = tf.summary.create_file_writer(IMAGE_DIR)
        image = tf.expand_dims(image, axis = 0)

        with image_writer.as_default():
            tf.summary.image('Heat Map',image, epoch)
    

    

TRAIN THE MODEL

In [36]:
run.log(CONFIGURATION)
abc = tf.keras.models.load_model('lenet.h5')
abc.compile(optimizer = Adam(learning_rate=CONFIGURATION['LEARNING_RATE']),
            loss = BinaryCrossentropy(),
            metrics= metrics
)
abc.fit(train_dataset, validation_data = val_dataset, epochs = 1, verbose = 1, callbacks=[WandbMetricsLogger()])




AttributeError: property 'model' of 'WandbCallback' object has no setter

In [32]:
run.finish()

0,1
BATCH_SIZE,▁
DROPOUT_RATE,▁
IM_SIZE,▁
KERNEL_SIZE,▁
LEARNING_RATE,▁
N_DENSE_1,▁
N_DENSE_2,▁
N_EPOCHS,▁
N_FILTERS,▁
N_STRIDES,▁

0,1
BATCH_SIZE,128.0
DROPOUT_RATE,0.0
IM_SIZE,224.0
KERNEL_SIZE,3.0
LEARNING_RATE,0.01
N_DENSE_1,100.0
N_DENSE_2,10.0
N_EPOCHS,1.0
N_FILTERS,6.0
N_STRIDES,1.0


VISUALIZATON

In [None]:
# USING Ctrl+Shift+P -> Tensorboard if using vscode
# otherwise ruse command  tensorboard --logdir = './logs'

Hyperparameter Tuning

In [None]:
# add dropout
# add regularizer (in conv2D layer, dense layer) -> kernel_regularizer
# add augmentation layer
# using hp api of tensorboard
def model_tune(hparams):
    lenet_model = tf.keras.Sequential([
        InputLayer(input_shape = (224,224,3)),
        
        # resize_rescale_layers,
        # augment_layers,

        tf.keras.layers.Conv2D(filters = 6, kernel_size=3, strides=1, padding="valid",activation ='relu',kernel_regularizer=L2(hparams[HP_REGULARIZATION_RATE])),
        BatchNormalization(),
        tf.keras.layers.MaxPool2D(pool_size=2,strides=2),
        Dropout(rate = hparams[HP_DROPOUT]),

        tf.keras.layers.Conv2D(filters = 16, kernel_size=3, strides=1, padding="valid",activation ='relu',kernel_regularizer=L2(hparams[HP_REGULARIZATION_RATE])),
        BatchNormalization(),
        tf.keras.layers.MaxPool2D(pool_size=2,strides=2),

        Flatten(),

        Dense(hparams[NUM_UNIT_1], activation ="relu",kernel_regularizer=L2(0.01)),
        BatchNormalization(),
        Dropout(rate = 0.3),
        Dense(hparams[NUM_UNIT_2], activation ="relu",kernel_regularizer=L2(0.01)),
        BatchNormalization(),
        Dense(1, activation ="sigmoid")])

    lenet_model.compile(
        optimizer = Adam(learning_rate=hparams[HP_LEARNING_RATE]),
        loss = BinaryCrossentropy(),
        metrics= ['accuracy']
    )
    lenet_model.fit(train_dataset, validation_data = val_dataset, epochs = 1, verbose = 1, callbacks=[tensorboard_callback, lr_scheduler_callback,LogImageCallback()])
    _, accuracy = lenet_model.evaluate(val_dataset)
    return accuracy

In [None]:
NUM_UNIT_1 = hp.HParam('num_unit_1', hp.Discrete([100]))
NUM_UNIT_2 = hp.HParam('num_unit_2', hp.Discrete([10]))
HP_DROPOUT = hp.HParam('dropout', hp.Discrete([0.3]))
HP_REGULARIZATION_RATE = hp.HParam('regularization_rate', hp.Discrete([0.01]))
HP_LEARNING_RATE = hp.HParam('learning_rate', hp.Discrete([0.01]))

In [None]:
run_number = 0
for num_unit_1 in NUM_UNIT_1.domain.values:
    for num_unit_2 in NUM_UNIT_2.domain.values:
        for dropout_rate in HP_DROPOUT.domain.values:
            for regularization_rate in HP_REGULARIZATION_RATE.domain.values:
                for learning_rate in HP_LEARNING_RATE.domain.values:
                    hparams = {
                        NUM_UNIT_1 : num_unit_1,
                        NUM_UNIT_2 : num_unit_2,
                        HP_DROPOUT : dropout_rate,
                        HP_REGULARIZATION_RATE: regularization_rate,
                        HP_LEARNING_RATE: learning_rate
                    }
                    file_writer = tf.summary.create_file_writer('./logs' +str(run_number))
                    with file_writer.as_default():
                        hp.hparams(hparams)
                        accuracy = model_tune(hparams)
                        tf.summary.scalar('accuracy',accuracy,run_number)
                    print('OUR HPARAM of run_number ', run_number ,hparams)
                    run_number += 1
