Implementation from https://github.com/kobiso/CBAM-keras

In [1]:
from keras import backend as K
from keras.activations import sigmoid
import numpy as np 
import random
import pandas as pd
import os
import matplotlib.pyplot as plt
import cv2
from shutil import copyfile
import keras
from keras.layers import Flatten, Dense, Input, GlobalAveragePooling2D, \
    GlobalMaxPooling2D, Activation, Conv2D, MaxPooling2D, BatchNormalization, \
    AveragePooling2D, Reshape, Permute, multiply, ZeroPadding2D, Dropout, LeakyReLU, \
    Concatenate, Add, Lambda
from keras.models import Model, Sequential, load_model, model_from_json
from keras_preprocessing.image import ImageDataGenerator
from keras.optimizers import RMSprop, Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import tensorflow as tf
from keras.applications import VGG16 
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.preprocessing import OneHotEncoder
from google.colab import drive
drive.mount("/content/drive")

random.seed(34) # For reproductibility
np.random.seed(34)

Mounted at /content/drive


# CBAM BLOCK

In [9]:
def cbam_block(cbam_feature, ratio=8):
	"""Contains the implementation of Convolutional Block Attention Module(CBAM) block.
	As described in https://arxiv.org/abs/1807.06521.
	"""
	
	cbam_feature = channel_attention(cbam_feature, ratio)
	cbam_feature = spatial_attention(cbam_feature)
	return cbam_feature

def channel_attention(input_feature, ratio=8):
	
	channel_axis = 1 if K.image_data_format() == "channels_first" else -1
	channel = input_feature.shape[channel_axis]
	
	shared_layer_one = Dense(channel//ratio,
							 activation='relu',
							 kernel_initializer='he_normal',
							 use_bias=True,
							 bias_initializer='zeros')
	shared_layer_two = Dense(channel,
							 kernel_initializer='he_normal',
							 use_bias=True,
							 bias_initializer='zeros')
	
	avg_pool = GlobalAveragePooling2D()(input_feature)    
	avg_pool = Reshape((1,1,channel))(avg_pool)
	assert avg_pool.shape[1:] == (1,1,channel)
	avg_pool = shared_layer_one(avg_pool)
	assert avg_pool.shape[1:] == (1,1,channel//ratio)
	avg_pool = shared_layer_two(avg_pool)
	assert avg_pool.shape[1:] == (1,1,channel)
	
	max_pool = GlobalMaxPooling2D()(input_feature)
	max_pool = Reshape((1,1,channel))(max_pool)
	assert max_pool.shape[1:] == (1,1,channel)
	max_pool = shared_layer_one(max_pool)
	assert max_pool.shape[1:] == (1,1,channel//ratio)
	max_pool = shared_layer_two(max_pool)
	assert max_pool.shape[1:] == (1,1,channel)
	
	cbam_feature = Add()([avg_pool,max_pool])
	cbam_feature = Activation('sigmoid')(cbam_feature)
	
	if K.image_data_format() == "channels_first":
		cbam_feature = Permute((3, 1, 2))(cbam_feature)
	
	return multiply([input_feature, cbam_feature])

def spatial_attention(input_feature):
	kernel_size = 7
	
	if K.image_data_format() == "channels_first":
		channel = input_feature.shape[1]
		cbam_feature = Permute((2,3,1))(input_feature)
	else:
		channel = input_feature.shape[-1]
		cbam_feature = input_feature
	
	avg_pool = Lambda(lambda x: K.mean(x, axis=3, keepdims=True))(cbam_feature)
	assert avg_pool.shape[-1] == 1
	max_pool = Lambda(lambda x: K.max(x, axis=3, keepdims=True))(cbam_feature)
	assert max_pool.shape[-1] == 1
	concat = Concatenate(axis=3)([avg_pool, max_pool])
	assert concat.shape[-1] == 2
	cbam_feature = Conv2D(filters = 1,
					kernel_size=kernel_size,
					strides=1,
					padding='same',
					activation='sigmoid',
					kernel_initializer='he_normal',
					use_bias=False)(concat)	
	assert cbam_feature.shape[-1] == 1
	
	if K.image_data_format() == "channels_first":
		cbam_feature = Permute((3, 1, 2))(cbam_feature)
		
	return multiply([input_feature, cbam_feature])

# Generate Data

In [3]:
X_true = []
y_true = []
for image_path in os.listdir("/content/drive/My Drive/tuberculose/data/train/True/"):
  img = cv2.imread("/content/drive/My Drive/tuberculose/data/train/True/" + image_path)
  X_true.append(img)
  y_true.append(1)
X_false = []
y_false = []
for image_path in os.listdir("/content/drive/My Drive/tuberculose/data/train/False/"):
  img = cv2.imread("/content/drive/My Drive/tuberculose/data/train/False/" + image_path)
  X_false.append(img)
  y_false.append(0)
X_test = []
for image_path in os.listdir("/content/drive/My Drive/tuberculose/data/test/"):
  img = cv2.imread("/content/drive/My Drive/tuberculose/data/test/" + image_path)
  X_test.append(img)

X = X_true + X_false
y = y_true + y_false
c = list(zip(X,y))
random.shuffle(c)
X, y = zip(*c)
print(f'{len(X)} images in the total training set')
print(f'{len(X_test)} images in the test set')

# Hot Encode Label
lb = {1 : [0,1], 0 : [1,0]}
y_lb = [lb[t] for t in y]

X_train, X_val, y_train, y_val = train_test_split(X, y_lb, test_size=0.1, random_state=42)
print(f'{len(X_train)} images in the train set')
print(f'{len(X_val)} images in the val set')

y_train, y_val = np.einsum('kli->lik', np.array([y_train])), np.einsum('kli->lik',np.array([y_val]))

def generate_predictions(predictions):
  probs = []
  for x in predictions:
    probs.append(x[1])
  return probs

ID_test = [x.split('.')[0] for x in os.listdir("/content/drive/My Drive/tuberculose/data/test/")]

718 images in the total training set
82 images in the test set
646 images in the train set
72 images in the val set


In [4]:
img_height, img_width = 256, 256
batch_size = 16 

## Going to do also some data augmentation since we don't have many images
train_datagen = ImageDataGenerator(rescale=1./255,
                                       rotation_range=50,
                                       width_shift_range=0.2,
                                       height_shift_range=0.2,
                                       shear_range=0.25,
                                       zoom_range=0.1,
                                       channel_shift_range = 20,
                                       horizontal_flip = True ,
                                       vertical_flip = True )
val_datagen = ImageDataGenerator(rescale=1./255)


def resize(img):
  img_ = cv2.resize(img, (img_height, img_width), interpolation=cv2.INTER_AREA)
  return img_

X_train_resized = np.array([resize(x) for x in X_train])
X_val_resized = np.array([resize(x) for x in X_val])

train_datagen.fit(X_train_resized, augment=True)
val_datagen.fit(X_val_resized)

# Model

In [14]:
from keras.regularizers import l2
def resnet_layer(inputs,
                 num_filters=16,
                 kernel_size=3,
                 strides=1,
                 activation='relu',
                 batch_normalization=True,
                 conv_first=True):
    """2D Convolution-Batch Normalization-Activation stack builder
    # Arguments
        inputs (tensor): input tensor from input image or previous layer
        num_filters (int): Conv2D number of filters
        kernel_size (int): Conv2D square kernel dimensions
        strides (int): Conv2D square stride dimensions
        activation (string): activation name
        batch_normalization (bool): whether to include batch normalization
        conv_first (bool): conv-bn-activation (True) or
            bn-activation-conv (False)
    # Returns
        x (tensor): tensor as input to the next layer
    """
    conv = Conv2D(num_filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding='same',
                  kernel_initializer='he_normal',
                  kernel_regularizer=l2(1e-4))

    x = inputs
    if conv_first:
        x = conv(x)
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
    else:
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
        x = conv(x)
    return x

depth = 32
num_filters = 16
num_res_blocks = int((depth - 2) / 6)

inputs = Input(shape=(256, 256, 3))
x = resnet_layer(inputs=inputs)
# Instantiate the stack of residual units
for stack in range(3):
      for res_block in range(num_res_blocks):
          strides = 1
          if stack > 0 and res_block == 0:  # first layer but not first stack
              strides = 2  # downsample
          y = resnet_layer(inputs=x,
                             num_filters=num_filters,
                             strides=strides)
          y = resnet_layer(inputs=y,
                             num_filters=num_filters,
                             activation=None)
          if stack > 0 and res_block == 0:  # first layer but not first stack
              # linear projection residual shortcut connection to match
              # changed dims
              x = resnet_layer(inputs=x,
                                 num_filters=num_filters,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
          # attention_module
          y = cbam_block(y)
          x = keras.layers.add([x, y])
          x = Activation('relu')(x)
      num_filters *= 2

# Add classifier on top.
# v1 does not use BN after last shortcut connection-ReLU
x = AveragePooling2D(pool_size=8)(x)
x = Dense(512, activation='relu')(x)
x = Dropout(0.3)(x)
y = Flatten()(x)
outputs = Dense(2, activation='softmax',
                kernel_initializer='he_normal')(y)

# Instantiate model.
model = Model(inputs=inputs, outputs=outputs)

In [None]:
model.summary()

In [12]:
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(lr=0.001),
              metrics=['acc'])

In [13]:
checkpoint = ModelCheckpoint("/content/drive/My Drive/tuberculose/xray_tuberculo_attention.h5",
                             monitor="val_acc",
                             mode="max",
                             save_best_only = True,
                             verbose=1)

#earlystop = EarlyStopping(monitor = 'val_loss', 
                          #min_delta = 0, 
                          #patience = 6,
                          #verbose = 1,
                          #restore_best_weights = True)
reduceLROnPlat = ReduceLROnPlateau(monitor='val_loss', factor=0.8, patience=10, verbose=1, mode='auto', min_delta=0.0001, cooldown=5, min_lr=0.0001)
# We put our call backs into a callback list
callbacks = [reduceLROnPlat, checkpoint]

epochs = 100
nb_train = 646
nb_val = 72
history = model.fit_generator(train_datagen.flow(X_train_resized, y_train, batch_size=batch_size),
                                 steps_per_epoch=nb_train // batch_size,
                                 epochs=epochs,
                                 callbacks=callbacks,
                                 validation_data=val_datagen.flow(X_val_resized, y_val, batch_size=batch_size),
                                 validation_steps=nb_val // batch_size)

Instructions for updating:
Please use Model.fit, which supports generators.
Epoch 1/100
Epoch 00001: val_acc improved from -inf to 0.59375, saving model to /content/drive/My Drive/tuberculose/xray_tuberculo_attention.h5
Epoch 2/100
Epoch 00002: val_acc did not improve from 0.59375
Epoch 3/100
Epoch 00003: val_acc did not improve from 0.59375
Epoch 4/100
Epoch 00004: val_acc improved from 0.59375 to 0.75000, saving model to /content/drive/My Drive/tuberculose/xray_tuberculo_attention.h5
Epoch 5/100
Epoch 00005: val_acc did not improve from 0.75000
Epoch 6/100
Epoch 00006: val_acc did not improve from 0.75000
Epoch 7/100
Epoch 00007: val_acc did not improve from 0.75000
Epoch 8/100
Epoch 00008: val_acc did not improve from 0.75000
Epoch 9/100
Epoch 00009: val_acc did not improve from 0.75000
Epoch 10/100
Epoch 00010: val_acc did not improve from 0.75000
Epoch 11/100
Epoch 00011: val_acc did not improve from 0.75000
Epoch 12/100
Epoch 00012: val_acc did not improve from 0.75000
Epoch 13/1

KeyboardInterrupt: ignored

In [15]:
model.load_weights("/content/drive/My Drive/tuberculose/xray_tuberculo_attention.h5")
x_test_ = np.array([resize(x) for x in X_test])
gen = ImageDataGenerator(rescale=1./255)
gen.fit(x_test_)
predict = model.predict(gen.flow(x_test_, batch_size=1, shuffle=False))
proba = generate_predictions(predict)
sub = pd.DataFrame({'ID' : ID_test, 'LABEL' : proba})
sub.to_csv('/content/drive/My Drive/tuberculose/sub_high_attention.csv', index=False)

Score of 0.774 ... which is the lowest score I got.

@TODO : Try to implement attention with VGG16

In [None]:
base_model = VGG16(input_shape =  (256, 256, 3), 
                  include_top = False, weights = 'imagenet')