In [None]:
import os, shutil, zipfile
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import models, layers, optimizers
# from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.inception_v3 import InceptionV3
import matplotlib.pyplot as plt
import time

In [None]:
# zipfile processing
# data_zip_dir = '/datasets/chest_xray/chest_xray.zip'        # for VM Machine
data_zip_dir = '/Users/hsyoon/Desktop/ANN/chest_xray.zip' # for Local Machine
data_dir = './datasets/'

# init
if os.path.isdir(os.path.join(data_dir, 'chest_xray')):
    pass
else:
    shutil.rmtree(data_dir)
    with zipfile.ZipFile(data_zip_dir, 'r') as z:
        z.extractall(data_dir)

In [None]:
def getDatasetDir(data_dir):
    train_dir = os.path.join(data_dir, 'chest_xray', 'train')
    val_dir = os.path.join(data_dir, 'chest_xray', 'val')
    test_dir = os.path.join(data_dir, 'chest_xray', 'test')
    return train_dir, val_dir, test_dir

def getResultDir(dataset_dir):
    normal_dir = os.path.join(data_dir, 'NORMAL')
    pneumonia_dir = os.path.join(data_dir, 'PNEUMONIA')
    return normal_dir, pneumonia_dir

def dataGenerator(resolution:tuple, batch_size:int):
    # set image generators
    (train_dir, val_dir, test_dir) = getDatasetDir(data_dir)

    train_datagen = ImageDataGenerator(rescale=1./255,
                                    rotation_range=20, shear_range=0.1,
                                    width_shift_range=0.1, height_shift_range=0.1,
                                    zoom_range=0.1, horizontal_flip=True, fill_mode='nearest')
    validation_datagen = ImageDataGenerator(rescale=1./255)
    test_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory(
            train_dir,
            target_size=resolution,
            batch_size=batch_size,
            class_mode='binary')
    validation_generator = validation_datagen.flow_from_directory(
            val_dir,
            target_size=resolution,
            batch_size=batch_size,
            class_mode='binary')
    test_generator = test_datagen.flow_from_directory(
            test_dir,
            target_size=resolution,
            batch_size=batch_size,
            class_mode='binary')
    return train_generator, validation_generator, test_generator

In [None]:
def build_model_1():
    # # vgg16 input size
    # input_shape = (224, 224, 3)
    # InceptionV3 input size
    input_shape = (299, 299, 3)

    # base_model = VGG16(weights='imagenet', input_shape=input_shape, include_top=False)
    base_model = InceptionV3(weights='imagenet', input_shape=input_shape, include_top=False)

    base_model.trainable = False     # freezing model
    x = layers.GlobalAveragePooling2D()(base_model.output)
    x = layers.Dense(512)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dense(128)(x)
    y = layers.Dense(1, activation='sigmoid')(x)
    model = models.Model(inputs=base_model.input, outputs=y)
    model.compile(optimizer=optimizers.RMSprop(learning_rate=1e-5),
                  loss='binary_crossentropy', metrics=['accuracy'])
    return model

def unfreeze_model(model):
    for layer in model.layers:
        if layer.name.startswith('block5'):
            layer.trainable = True
    model.compile(optimizer=optimizers.RMSprop(learning_rate=1e-5),
                    loss='binary_crossentropy', metrics=['accuracy'])

# w/ Dropout Layer
def build_model_2():
    # # vgg16 input size
    # input_shape = (224, 224, 3)
    # InceptionV3 input size
    input_shape = (299, 299, 3)

    # base_model = VGG16(weights='imagenet', input_shape=input_shape, include_top=False)
    base_model = InceptionV3(weights='imagenet', input_shape=input_shape, include_top=False)
    
    base_model.trainable = False     # freezing model
    x = layers.GlobalAveragePooling2D()(base_model.output)
    x = layers.Dropout(0.25)(x)
    x = layers.Dense(512)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.25)(x)
    x = layers.Dense(128)(x)
    x = layers.Dropout(0.25)(x)
    y = layers.Dense(1, activation='sigmoid')(x)
    model = models.Model(inputs=base_model.input, outputs=y)
    model.compile(optimizer=optimizers.RMSprop(learning_rate=1e-5),
                  loss='binary_crossentropy', metrics=['accuracy'])
    return model


def plot_acc_loss(h):
    plt.figure(figsize=(15.6, 4.8), dpi=100)
    plt.subplot(1,2,1)
    plt.plot(h.history['accuracy'])
    plt.plot(h.history['val_accuracy'])
    plt.title('Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Training', 'Validation'], loc=0)
    
    plt.subplot(1,2,2)
    plt.plot(h.history['loss'])
    plt.plot(h.history['val_loss'])
    plt.title('Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Training', 'Validation'], loc=0)
    plt.show()

# MAIN LOOP

## Q1

In [None]:
resolution = (128, 128)
(train_generator, val_generator, test_generator) = dataGenerator(resolution, batch_size=20)

# training before FT
num_epochs = 100
starttime=time.time()
model = build_model_1()
history = model.fit_generator(train_generator, epochs=num_epochs,
                              validation_data=val_generator)
print("1st learning elapsed time (in sec): ", time.time()-starttime)

# evalution before FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_1_beforeFT.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (Before FT):', train_acc)
print('test_acc (Before FT):', test_acc)
print('train_loss (Before FT):', train_loss)
print('test_loss (Before FT):', test_loss)

In [None]:
# Loading and unfreezing pre-trained model
model = models.load_model('./qe3_1_beforeFT.h5')
unfreeze_model(model)

# training after FT
num_epochs = 50
starttime=time.time()
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("2nd learning elapsed time (in sec): ", time.time()-starttime)

# evalution after FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_1_afterFT.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (After FT):', train_acc)
print('test_acc: (After FT)', test_acc)
print('train_loss (After FT):', train_loss)
print('test_loss: (After FT)', test_loss)

## Q2

In [None]:
# training before FT
num_epochs = 100
starttime=time.time()
model = build_model_2()
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("1st learning elapsed time (in sec): ", time.time()-starttime)

# evalution before FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_2_beforeFT.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (Before FT):', train_acc)
print('test_acc (Before FT):', test_acc)
print('train_loss (Before FT):', train_loss)
print('test_loss (Before FT):', test_loss)

In [None]:
# Loading and unfreezing pre-trained model
model = models.load_model('./qe3_2_beforeFT.h5')
unfreeze_model(model)

# training after FT
num_epochs = 100
starttime=time.time()
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("2nd learning elapsed time (in sec): ", time.time()-starttime)

# evalution after FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_2_afterFT.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (After FT):', train_acc)
print('test_acc: (After FT)', test_acc)
print('train_loss (After FT):', train_loss)
print('test_loss: (After FT)', test_loss)

## Q3

### for [256, 256]

In [None]:
resolution = (256, 256)
(train_generator, val_generator, test_generator) = dataGenerator(resolution, batch_size=20)

# training before FT
num_epochs = 100
starttime=time.time()
model = build_model_2() # w/ dropout layer
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("1st learning elapsed time (in sec): ", time.time()-starttime)

# evalution before FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_3_beforeFT_256.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (Before FT):', train_acc)
print('test_acc (Before FT):', test_acc)
print('train_loss (Before FT):', train_loss)
print('test_loss (Before FT):', test_loss)

In [None]:
# Loading and unfreezing pre-trained model
model = models.load_model('./qe3_3_beforeFT_256.h5')
unfreeze_model(model)

# training after FT
num_epochs = 100
starttime=time.time()
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("2nd learning elapsed time (in sec): ", time.time()-starttime)

# evalution after FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_3_afterFT_256.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (After FT):', train_acc)
print('test_acc: (After FT)', test_acc)
print('train_loss (After FT):', train_loss)
print('test_loss: (After FT)', test_loss)

### for [512, 512]

In [None]:
resolution = (512, 512)
(train_generator, val_generator, test_generator) = dataGenerator(resolution, batch_size=10)

# training before FT
num_epochs = 100
starttime=time.time()
model = build_model_2() # w/ dropout layer
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("1st learning elapsed time (in sec): ", time.time()-starttime)

# evalution before FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_3_beforeFT_512.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (Before FT):', train_acc)
print('test_acc (Before FT):', test_acc)
print('train_loss (Before FT):', train_loss)
print('test_loss (Before FT):', test_loss)

In [None]:
# Loading and unfreezing pre-trained model
model = models.load_model('./qe3_3_beforeFT_512.h5')
unfreeze_model(model)

# training after FT
num_epochs = 100
starttime=time.time()
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("2nd learning elapsed time (in sec): ", time.time()-starttime)

# evalution after FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_3_afterFT_512.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (After FT):', train_acc)
print('test_acc: (After FT)', test_acc)
print('train_loss (After FT):', train_loss)
print('test_loss: (After FT)', test_loss)

## Q4

In [None]:
# from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input, decode_predictions
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input, decode_predictions
from tensorflow.keras import backend as K 
from tensorflow.keras.preprocessing import image 
import cv2
import numpy as np
import tensorflow as tf	
tf.compat.v1.disable_eager_execution()

In [None]:
def deprocess_image(X):
    X = (X - X.mean()) / (X.std() + 1e-5)       # normalize
    X = 0.1 * X + 0.5                           # scaling
    X = np.clip(X, 0, 1)                        # clipping
    X = (X * 255).clip(0, 255).astype('uint8')  # type change/clipping
    return X 


def draw_activation(activation, figure_name):
    images_per_row = 16
    n_features = activation.shape[-1]
    size = activation.shape[1]
    n_cols=n_features // images_per_row
    display_grid=np.zeros((size * n_cols, images_per_row*size))
    for col in range(n_cols):
        for row in range(images_per_row):
            channel_image = activation[0, :, :, col*images_per_row+row]
            channel_image = deprocess_image(channel_image)
            display_grid[col*size:(col+1)*size,
            row*size:(row+1)*size] = channel_image
    scale = 1./size

    plt.figure(figsize=(scale*display_grid.shape[1], scale*display_grid.shape[0])) 
    plt.title(figure_name)
    plt.grid(False)
    plt.imshow(display_grid, aspect='auto', cmap='viridis')
    plt.show()

def gradCAM(model, x):
    preds = model.predict(x)
    print('Predicted:', decode_predictions(preds, top=5)[0])
    max_output=model.output[:,np.argmax(preds)]
    
    # last_conv_layer = model.get_layer('block5_conv3')   # for VGG16
    last_conv_layer = model.get_layer('mixed10')      # for InceptionV3

    # grads = K.gradients(max_output, last_conv_layer.output)[0]  # (None, 14, 14, 512) for VGG16
    grads = K.gradients(max_output, last_conv_layer.output)[0]  # (None, 17, 17, 2048) for InceptionV3

    # gradient of max_output with respect to last_conv_layer.output
    pooled_grads = K.mean(grads, axis=(0,1,2))
    
    iterate = K.function([model.input], [pooled_grads, last_conv_layer.output[0]])
    pooled_grads_value, conv_layer_output_value=iterate([x])
    
    for i in range(2048):    # the number of filters (InceptionV3 : 2048)
        conv_layer_output_value[:,:,i] *= pooled_grads_value[i]
    heatmap = np.mean(conv_layer_output_value, axis=-1)
    heatmap = np.maximum(heatmap, 0) 
    heatmap /= np.max(heatmap) 
    return heatmap, conv_layer_output_value, pooled_grads_value

In [None]:
# image preprocessing 
img_path = './datasets/chest_xray/train/PNEUMONIA/person1017_bacteria_2948.jpeg'
img=image.load_img(img_path, target_size=(299,299)) 
img_tensor = image.img_to_array(img) 
img_tensor = np.expand_dims(img_tensor, axis=0) 
img_tensor = preprocess_input(img_tensor)

# prediction 
model = InceptionV3(weights='imagenet') 
heatmap, conv_output, pooled_grads=gradCAM(model, img_tensor)

# image processing
img=cv2.imread(img_path)
heatmap=cv2.resize(heatmap, (img.shape[1],img.shape[0]))
heatmap=cv2.applyColorMap(np.uint8(255*heatmap), cv2.COLORMAP_JET)
# superimposed_image = cv2.addWeighted(original_image, 0.6, heatmap, 0.4, 0)
superimposed_img=heatmap*0.4+img*0.6

# save image
cv2.imwrite(img_path.split('/')[-1].split('.')[0] + '_CAM.jpg', superimposed_img)


# # visualization of conv_output and pooled_grads
# draw_no=range(256,256+32,1)
# conv_activation=np.expand_dims(conv_output[:,:,draw_no], axis=0)
# draw_activation(conv_activation, 'last_conv')
# plt.matshow(pooled_grads[draw_no].reshape(-1,16), cmap='viridis')

# 8-bit processing to print image
superimposed_img = cv2.cvtColor(np.uint8(superimposed_img), cv2.COLOR_BGR2RGB)
# heatmap = cv2.cvtColor(np.uint8(heatmap), cv2.COLOR_BGR2RGB)

fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(12, 4))
ax1.imshow(img / 255.0)
ax1.set_title('Original Image')
ax1.axis('off')
ax2.imshow(heatmap)
ax2.set_title('Class Activation Map')
ax2.axis('off')
ax3.imshow(superimposed_img / 255.0)
ax3.set_title('Original Image with CAM')
ax3.axis('off')
plt.tight_layout()
plt.show()

## QE1

In [None]:
def dataGenerator_noSize(batch_size:int):
    # set image generators
    (train_dir, val_dir, test_dir) = getDatasetDir(data_dir)

    train_datagen = ImageDataGenerator(rescale=1./255,
                                    rotation_range=20, shear_range=0.1,
                                    width_shift_range=0.1, height_shift_range=0.1,
                                    zoom_range=0.1, horizontal_flip=True, fill_mode='nearest')
    validation_datagen = ImageDataGenerator(rescale=1./255)
    test_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory(
            train_dir,
            batch_size=batch_size,
            class_mode='binary')
    validation_generator = validation_datagen.flow_from_directory(
            val_dir,
            batch_size=batch_size,
            class_mode='binary')
    test_generator = test_datagen.flow_from_directory(
            test_dir,
            batch_size=batch_size,
            class_mode='binary')
    return train_generator, validation_generator, test_generator


# w/ Dropout Layer
def build_model_2_noSize():
    # vgg16 input size
    input_shape = (None, None, 3)

    # include_top=False : excepting output Dense layer of vgg16 for matching an input layer dimension of GlobalAveragePooling2D
    base_model = InceptionV3(weights='imagenet', input_shape=input_shape, include_top=False)
    
    base_model.trainable = False     # freezing model
    x = layers.GlobalAveragePooling2D()(base_model.output)
    x = layers.Dropout(0.25)(x)
    x = layers.Dense(512)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.25)(x)
    x = layers.Dense(128)(x)
    x = layers.Dropout(0.25)(x)
    y = layers.Dense(1, activation='sigmoid')(x)
    model = models.Model(inputs=base_model.input, outputs=y)
    model.compile(optimizer=optimizers.RMSprop(learning_rate=1e-5),
                  loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [None]:
(train_generator, val_generator, test_generator) = dataGenerator_noSize(batch_size=20)

# training before FT
num_epochs = 100
starttime=time.time()
model = build_model_2_noSize() # w/ dropout layer
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("1st learning elapsed time (in sec): ", time.time()-starttime)

# evalution before FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_e1_beforeFT_nosize.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (Before FT):', train_acc)
print('test_acc (Before FT):', test_acc)
print('train_loss (Before FT):', train_loss)
print('test_loss (Before FT):', test_loss)

# Loading and unfreezing pre-trained model
model = models.load_model('./qe3_e1_beforeFT_nosize.h5')
unfreeze_model(model)

# training after FT
num_epochs = 100
starttime=time.time()
history = model.fit_generator(train_generator, epochs=num_epochs, validation_data=val_generator)
print("2nd learning elapsed time (in sec): ", time.time()-starttime)

# evalution after FT
model.summary()
plot_acc_loss(history)
model.save('./qe3_e1_afterFT_nosize.h5')
train_loss, train_acc = model.evaluate_generator(train_generator)
test_loss, test_acc = model.evaluate_generator(test_generator)
print('train_acc (After FT):', train_acc)
print('test_acc: (After FT)', test_acc)
print('train_loss (After FT):', train_loss)
print('test_loss: (After FT)', test_loss)

## QE2

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

In [None]:
def getScore(model_dir:str):
    test_generator = None
    model = models.load_model(model_dir)
    # q1, q2
    if '128' in model_dir:
        test_generator = dataGenerator((128, 128), batch_size=20)[2]
    # q3_256
    elif '256' in model_dir:
        test_generator = dataGenerator((256, 256), batch_size=20)[2]
    # q3_512
    elif '512' in model_dir:
        test_generator = dataGenerator((512, 512), batch_size=10)[2]
    # qe1 (nosize)
    elif 'nosize' in model_dir:
        test_generator = dataGenerator_noSize(batch_size=20)[2]
    else:
        pass

    y_true = test_generator.classes
    y_pred = model.predict_generator(test_generator)
    preds_1d = y_pred.flatten()
    y_pred = np.where(preds_1d >= 0.5, 1, 0)


    cm = confusion_matrix(y_true, y_pred)
    tn = cm[0, 0]
    fp = cm[0, 1]
    fn = cm[1, 0]
    tp = cm[1, 1]

    accuracy = (tn + tp) / (tn + fp + fn + tp)
    precision = tp / (fp + tp)
    recall = tp / (fn + tp)
    specificity = tn / (tn + fp)
    f1 = 2 * precision * recall / (precision + recall)
    auc = roc_auc_score(y_true, y_pred)

    plotCM(cm, y_pred, y_true)
    print('<' + model_dir[:-3] + ' Score>')
    print(f'Accuracy : {accuracy:.4f}')
    print(f'Precision : {precision:.4f}')
    print(f'Recall : {recall:.4f}')
    print(f'Specificity : {specificity:.4f}')
    print(f'F1 Score : {f1:.4f}')
    print(f'AUC : {auc:.4f}')

def plotCM(cm, y_pred, y_true):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

getScore('qe3_1_beforeFT_128.h5')
getScore('qe3_1_afterFT_128.h5')
getScore('qe3_2_beforeFT_128.h5')
getScore('qe3_2_afterFT_128.h5')
getScore('qe3_3_beforeFT_256.h5')
getScore('qe3_3_afterFT_256.h5')
getScore('qe3_3_beforeFT_512.h5')
getScore('qe3_3_afterFT_512.h5')
getScore('qe3_e1_beforeFT_nosize.h5')
getScore('qe3_e1_afterFT_nosize.h5')