In [0]:
%tensorflow_version 1.x
import glob
import numpy as np
import pandas as pd
import os
import shutil 
import matplotlib.pyplot as plt
from keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array, array_to_img
%matplotlib inline
from sklearn.preprocessing import LabelEncoder 
from keras.utils import to_categorical
from keras.layers import Conv2D, MaxPooling2D, Activation, Flatten, Dense, Dropout, InputLayer, GlobalAveragePooling2D, BatchNormalization
from keras.models import Sequential
from keras import optimizers
from keras.applications.resnet50 import ResNet50
from keras.models import Model
import keras
from sklearn.metrics import classification_report, confusion_matrix

In [0]:
from google.colab import drive, files
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Useful variables

In [0]:
NUM_CLASS = 5
TRAIN_SIZE = 1440
VAL_SIZE = 160
TEST_SIZE = 400
BATCH = 32
IMG_WIDTH=224
IMG_HEIGHT=224
IMG_DIM = (IMG_WIDTH, IMG_HEIGHT)

Get all the train files for each hemorrhage from their directory

In [0]:
# intraparenchymal
ip_files = glob.glob('/content/drive/My Drive/data/intraparenchymal/*')
ip_train_files = [fn for fn in ip_files if 'ID' in fn] 

# none
none_files = glob.glob('/content/drive/My Drive/data/none/*')
none_train_files = [fn for fn in none_files if 'ID' in fn] 

# epidural
ep_files = glob.glob('/content/drive/My Drive/data/epidural/*')
ep_train_files = [fn for fn in ep_files if 'ID' in fn]

# intraventricular
iv_files = glob.glob('/content/drive/My Drive/data/intraventricular/*')
iv_train_files = [fn for fn in iv_files if 'ID' in fn]

# subarachnoid
sa_files = glob.glob('/content/drive/My Drive/data/subarachnoid/*')
sa_train_files = [fn for fn in sa_files if 'ID' in fn]

# subdural
sd_files = glob.glob('/content/drive/My Drive/data/subdural/*')
sd_train_files = [fn for fn in sd_files if 'ID' in fn]

Get all the test files for each hemorrhage from their directory

In [0]:
# intraparenchymal
ip_test_files = glob.glob('/content/drive/My Drive/data/intraparenchymal/test/*')
ip_test_files = [fn for fn in ip_test_files] 

# none
none_test_files = glob.glob('/content/drive/My Drive/data/none/test/*')
non_test_files = [fn for fn in none_test_files] 

# epidural
ep_test_files = glob.glob('/content/drive/My Drive/data/epidural/test/*')
ep_test_files = [fn for fn in ep_test_files] 

# intraventricular
iv_test_files = glob.glob('/content/drive/My Drive/data/intraventricular/test/*')
iv_test_files = [fn for fn in iv_test_files if 'ID' in fn]

# subarachnoid
sa_test_files = glob.glob('/content/drive/My Drive/data/subarachnoid/test/*')
sa_test_files = [fn for fn in sa_test_files if 'ID' in fn]

# subdural
sd_test_files = glob.glob('/content/drive/My Drive/data/subdural/test/*')
sd_test_files = [fn for fn in sd_test_files if 'ID' in fn]

Organize data into train, validation, and test sets for each hemorrhage type and then concatenate

In [0]:
# intraparenchymal
ip_train = np.random.choice(ip_train_files, size=TRAIN_SIZE, replace=False)
ip_files = list(set(ip_train_files) - set(ip_train))
ip_val = np.random.choice(ip_files, size=VAL_SIZE, replace=False) 
ip_test = np.random.choice(ip_test_files, size=TEST_SIZE, replace = False)

# none
none_train = np.random.choice(none_train_files, size=TRAIN_SIZE, replace=False)
none_files = list(set(none_train_files) - set(none_train))
none_val = np.random.choice(none_files, size=VAL_SIZE, replace=False)
none_test = np.random.choice(none_test_files, size=TEST_SIZE, replace = False)

# epidural
ep_train = np.random.choice(ep_train_files, size=TRAIN_SIZE, replace=False) 
ep_files = list(set(ep_train_files) - set(ep_train))
ep_val = np.random.choice(ep_files, size=VAL_SIZE, replace=False) 
ep_test = np.random.choice(ep_test_files, size=TEST_SIZE, replace = False)

# intraventricular
iv_train = np.random.choice(iv_train_files, size=TRAIN_SIZE, replace=False) 
iv_files = list(set(iv_train_files) - set(iv_train))
iv_val = np.random.choice(iv_files, size=VAL_SIZE, replace=False) 
iv_test = np.random.choice(iv_test_files, size=TEST_SIZE, replace = False)

# subarachnoid
sa_train = np.random.choice(sa_train_files, size=TRAIN_SIZE, replace=False) 
sa_files = list(set(sa_train_files) - set(sa_train))
sa_val = np.random.choice(sa_files, size=VAL_SIZE, replace=False) 
sa_test = np.random.choice(sa_test_files, size=TEST_SIZE, replace = False)

# subdural
sd_train = np.random.choice(sd_train_files, size=TRAIN_SIZE, replace=False) 
sd_files = list(set(sd_train_files) - set(sd_train))
sd_val = np.random.choice(sd_files, size=VAL_SIZE, replace=False) 
sd_test = np.random.choice(sd_test_files, size=TEST_SIZE, replace = False)

Load images as arrays

In [0]:
# intraparenchymal
ip_files = glob.glob('/content/drive/My Drive/data/intraparenchymal/*')
ip_train_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in ip_files if img in ip_train]
ip_train_imgs = np.array(ip_train_imgs)
ip_train_labels = ['intraparenchymal' for img in ip_files if img in ip_train]

ip_val_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in ip_files if img in ip_val]
ip_val_imgs = np.array(ip_val_imgs)
ip_val_labels = ['intraparenchymal' for img in ip_files if img in ip_val]

ip_test_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in ip_test]
ip_test_imgs = np.array(ip_test_imgs)
ip_test_labels = ['intraparenchymal' for img in ip_test]

In [0]:
# none
none_files = glob.glob('/content/drive/My Drive/data/none/*')
none_train_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in none_files if img in none_train]
none_train_imgs = np.array(none_train_imgs)
none_train_labels = ['none' for img in none_files if img in none_train]

none_val_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in none_files if img in none_val]
none_train_imgs = np.array(none_train_imgs)
none_val_labels = ['none' for img in none_files if img in none_val]

none_test_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in none_test]
none_test_imgs = np.array(none_test_imgs)
none_test_labels = ['none' for img in none_test]

In [0]:
# epidural
ep_files = glob.glob('/content/drive/My Drive/data/epidural/*')
ep_train_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in ep_files if img in ep_train]
ep_train_imgs = np.array(ep_train_imgs)
ep_train_labels = ['epidural' for img in ep_files if img in ep_train]

ep_val_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in ep_files if img in ep_val]
ep_val_imgs = np.array(ep_val_imgs)
ep_val_labels = ['epidural' for img in ep_files if img in ep_val]

ep_test_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in ep_test]
ep_test_imgs = np.array(ep_test_imgs)
ep_test_labels = ['epidural' for img in ep_test]

In [0]:
# intraventricular
iv_files = glob.glob('/content/drive/My Drive/data/intraventricular/*')
iv_train_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in iv_files if img in iv_train]
iv_train_imgs = np.array(iv_train_imgs)
iv_train_labels = ['intraventricular' for img in iv_files if img in iv_train]

iv_val_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in iv_files if img in iv_val]
iv_val_imgs = np.array(iv_val_imgs)
iv_val_labels = ['intraventricular' for img in iv_files if img in iv_val]

iv_test_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in iv_test]
iv_test_imgs = np.array(iv_test_imgs)
iv_test_labels = ['intraventricular' for img in iv_test]

In [0]:
# subarachnoid
sa_files = glob.glob('/content/drive/My Drive/data/subarachnoid/*')
sa_train_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in sa_files if img in sa_train]
sa_train_imgs = np.array(sa_train_imgs)
sa_train_labels = ['subarachnoid' for img in sa_files if img in sa_train]

sa_val_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in sa_files if img in sa_val]
sa_val_imgs = np.array(sa_val_imgs)
sa_val_labels = ['subarachnoid' for img in sa_files if img in sa_val]

sa_test_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in sa_test]
sa_test_imgs = np.array(sa_test_imgs)
sa_test_labels = ['subarachnoid' for img in sa_test]

In [0]:
# subdural
sd_files = glob.glob('/content/drive/My Drive/data/subdural/*')
sd_train_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in sd_files if img in sd_train]
sd_train_imgs = np.array(sd_train_imgs)
sd_train_labels = ['subdural' for img in sd_files if img in sd_train]

sd_val_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in sd_files if img in sd_val]
sd_val_imgs = np.array(sd_val_imgs)
sd_val_labels = ['subdural' for img in sd_files if img in sd_val]

sd_test_imgs = [img_to_array(load_img(img, target_size=IMG_DIM)) for img in sd_test]
sd_test_imgs = np.array(sd_test_imgs)
sd_test_labels = ['subdural' for img in sd_test]

Concatenate train, validation, and test data to create complete sets

In [0]:
hem_train = []
hem_val = []
hem_test = []

for j in range(0,6):
  for i in range(0, TRAIN_SIZE):
    if(j == 5):
      hem_train.append('no')
    else:
      hem_train.append('yes')

for j in range(0,6):
  for i in range(0, VAL_SIZE):
    if(j == 5):
      hem_val.append('no')
    else:
      hem_val.append('yes')

for j in range(0,6):
  for i in range(0, TEST_SIZE):
    if(j == 5):
      hem_test.append('no')
    else:
      hem_test.append('yes')

In [0]:
train_imgs = np.concatenate([ip_train_imgs, 
                             ep_train_imgs,
                             iv_train_imgs,
                             sa_train_imgs,
                             sd_train_imgs,
                             none_train_imgs])
train_labels = hem_train

val_imgs = np.concatenate([ip_val_imgs, 
                           ep_val_imgs,
                           iv_val_imgs,
                           sa_val_imgs,
                           sd_val_imgs,
                           none_val_imgs])
val_labels = hem_val

test_imgs = np.concatenate([ip_test_imgs, 
                            ep_test_imgs,
                            iv_test_imgs,
                            sa_test_imgs,
                            sd_test_imgs,
                            none_test_imgs])
test_labels = hem_test

Encode text category labels

In [0]:
le = LabelEncoder() 
le.fit(train_labels) 

# encoding
train_labels_enc = le.transform(train_labels) 
val_labels_enc = le.transform(val_labels) 
test_labels_enc = le.transform(test_labels)

Create image data generators

In [0]:
train_datagen = ImageDataGenerator(
    rescale=1./255, 
    zoom_range=[0,1], 
    rotation_range=20,
    width_shift_range=0.05, 
    height_shift_range=0.05, 
    shear_range=0.05, 
    horizontal_flip=True, 
    fill_mode='nearest')
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow(train_imgs, train_labels_enc,batch_size=BATCH)
val_generator = val_datagen.flow(val_imgs, val_labels_enc, batch_size=BATCH)
test_generator = test_datagen.flow(test_imgs, test_labels_enc, batch_size=BATCH, shuffle=False)

Create model using transfer learning with ResNet50

In [0]:
resnet_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_WIDTH, IMG_HEIGHT, 3))

# making only batch normalization layers trainable
for layer in resnet_model.layers:
    if isinstance(layer, BatchNormalization):
        layer.trainable = True
    else:
        layer.trainable = False

bin_model = Sequential()
bin_model.add(resnet_model)
bin_model.add(GlobalAveragePooling2D())
bin_model.add(Dense(256, activation='relu'))
bin_model.add(Dropout(.4))
bin_model.add(BatchNormalization())

# binary classification
bin_model.add(Dense(1, activation='sigmoid'))
bin_model.compile(loss='binary_crossentropy',optimizer='adam',metrics=['accuracy'])

In [0]:
TOTAL_TRAIN = TRAIN_SIZE * (NUM_CLASS+1)
TOTAL_VAL = VAL_SIZE * (NUM_CLASS+1)

history = bin_model.fit_generator(train_generator, 
                              steps_per_epoch=TOTAL_TRAIN//BATCH, 
                              epochs=15,
                              validation_data=val_generator, 
                              validation_steps=TOTAL_VAL//BATCH, 
                              verbose=1)

In [0]:
bin_model.save('binary_class_model.h5')

In [0]:
bin_model.evaluate(test_generator, verbose=1)

In [0]:
# confusion matrix
y_pred = bin_model.predict(test_generator)
for i in y_pred:
  i[0] = round(i[0])

print('Confusion Matrix')
print(pd.DataFrame(confusion_matrix(test_labels_enc, y_pred)))

In [0]:
bin_model.summary()

In [0]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Number of epochs')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [0]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Number of epochs')
plt.legend(['train', 'test'], loc='upper right')
plt.show()

Concatenate train, validation, and test data to create complete sets

In [0]:
train_imgs = np.concatenate([ip_train_imgs, 
                             ep_train_imgs,
                             iv_train_imgs,
                             sa_train_imgs,
                             sd_train_imgs])
train_labels = ip_train_labels + ep_train_labels + iv_train_labels + sa_train_labels + sd_train_labels

val_imgs = np.concatenate([ip_val_imgs, 
                           ep_val_imgs,
                           iv_val_imgs,
                           sa_val_imgs,
                           sd_val_imgs])
val_labels = ip_val_labels + ep_val_labels + iv_val_labels + sa_val_labels + sd_val_labels

test_imgs = np.concatenate([ip_test_imgs,  
                            ep_test_imgs,
                            iv_test_imgs,
                            sa_test_imgs,
                            sd_test_imgs])
test_labels = ip_test_labels + ep_test_labels + iv_test_labels + sa_test_labels + sd_test_labels

Encode text category labels

In [0]:
le = LabelEncoder() 
le.fit(train_labels) 

# encoding
train_labels_enc = le.transform(train_labels) 
val_labels_enc = le.transform(val_labels) 
test_labels_enc = le.transform(test_labels)
test_labels_cm = test_labels_enc

# one hot encoding
train_labels_enc = to_categorical(train_labels_enc) 
val_labels_enc = to_categorical(val_labels_enc) 
test_labels_enc = to_categorical(test_labels_enc)

Create image data generators

In [0]:
train_datagen = ImageDataGenerator(
    rescale=1./255, 
    zoom_range=[0,1], 
    rotation_range=20,
    width_shift_range=0.05, 
    height_shift_range=0.05, 
    shear_range=0.05, 
    horizontal_flip=True, 
    fill_mode='nearest')
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow(train_imgs, train_labels_enc,batch_size=BATCH)
val_generator = val_datagen.flow(val_imgs, val_labels_enc, batch_size=BATCH)
test_generator = test_datagen.flow(test_imgs, test_labels_enc, batch_size=BATCH, shuffle=False)

Create model using transfer learning with ResNet50

In [0]:
resnet_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_WIDTH, IMG_HEIGHT, 3))

# making only batch normalization layers trainable
for layer in resnet_model.layers:
    if isinstance(layer, BatchNormalization):
        layer.trainable = True
    else:
        layer.trainable = False

multi_model = Sequential()
multi_model.add(resnet_model)
multi_model.add(GlobalAveragePooling2D())
multi_model.add(Dense(256, activation='relu'))
multi_model.add(Dropout(.3))
multi_model.add(BatchNormalization())

# multi-class classification
multi_model.add(Dense(NUM_CLASS, activation='softmax'))
multi_model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])

In [0]:
TOTAL_TRAIN = TRAIN_SIZE * NUM_CLASS
TOTAL_VAL = VAL_SIZE * NUM_CLASS

history = multi_model.fit_generator(train_generator, 
                              steps_per_epoch=TOTAL_TRAIN//BATCH, 
                              epochs=90,
                              validation_data=val_generator, 
                              validation_steps=TOTAL_VAL//BATCH, 
                              verbose=1)

In [0]:
multi_model.save('multi_class_model.h5')

In [0]:
multi_model.evaluate(test_generator, verbose=1)

In [0]:
# confusion matrix
y_pred = multi_model.predict(test_generator, verbose=2)
y_pred = np.argmax(y_pred, axis=1)

print('Confusion Matrix')
print(pd.DataFrame(confusion_matrix(test_labels_cm, y_pred)))

In [0]:
multi_model.summary()

In [0]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Number of epochs')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [0]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Number of epochs')
plt.legend(['train', 'test'], loc='upper right')
plt.show()