In [1]:
import numpy as np
import pandas as pd
from PIL import Image, ImageOps
import matplotlib.pyplot as plt
import os
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.applications.inceptionresnetv2 import InceptionResnetV2, preprocess_input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.initializers import Constant
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score, classification_report

ModuleNotFoundError: No module named 'tensorflow.keras.applications.inceptionresnetv2'

In [None]:
def prepare_data(ds, shuffle=False, augment=False):
    ds = ds.map(lambda x, y: (preprocess_input(x), y))
    if shuffle:
        ds = ds.shuffle(1000)
    ds = ds.batch(batch_size)
    if augment:
        ds = ds.map(
            lambda x, y: (data_augmentation(x, training=True), y),
            num_parallel_calls=AUTOTUNE
        )
    return ds.prefetch(buffer_size=AUTOTUNE)

In [None]:
metadata = pd.read_csv('data/Chest_xray_Corona_Metadata.csv').drop('Unnamed: 0', axis=1)
metadata.head()

In [None]:
metadata['Label'].value_counts()

In [None]:
image_root_path = 'data/'

In [None]:
batch_size = 50
input_shape = (224, 224, 3)
AUTOTUNE = tf.data.AUTOTUNE

In [None]:
train_data = image_dataset_from_directory(
    'data/train/',
    labels='inferred',
    batch_size=50,
    image_size=input_shape[:2],
    seed=1,
    validation_split=0.2,
    subset='training'
)

In [None]:
validation_data = image_dataset_from_directory(
    'data/train/',
    labels='inferred',
    batch_size=50,
    image_size=input_shape[:2],
    seed=1,
    validation_split=0.2,
    subset='validation'
)

In [None]:
test_data = image_dataset_from_directory(
    'data/test/',
    labels='inferred',
    image_size=input_shape[:2]
)

In [None]:
iterator = iter(train_data)
y_train = np.array([])
for i in iterator:
    y_train = np.concatenate([y_train, i[1].numpy()])

y_train.mean()

In [None]:
iterator = iter(validation_data)
y_val = np.array([])
for i in iterator:
    y_val = np.concatenate([y_val, i[1].numpy()])
    
y_val.mean()

In [None]:
iterator = iter(test_data)
y_test = np.array([])
for i in iterator:
    y_test = np.concatenate([y_test, i[1].numpy()])

y_test.mean()

In [None]:
data_augmentation = tf.keras.Sequential([
layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
layers.experimental.preprocessing.RandomRotation(0.2),
layers.experimental.preprocessing.RandomHeight(0.1),
layers.experimental.preprocessing.RandomWidth(0.1),
layers.experimental.preprocessing.RandomZoom(0.1),
])

In [None]:
# train_data = prepare_data(train_data, shuffle=True, augment=True)
# validation_data = prepare_data(validation_data)
# test_data = prepare_data(test_data)
train_data = train_data.map(lambda x, y: (preprocess_input(x), y))
validation_data = validation_data.map(lambda x, y: (preprocess_input(x), y))
test_data = test_data.map(lambda x, y: (preprocess_input(x), y))
train_data_pf = train_data.prefetch(buffer_size=AUTOTUNE)
validation_data_pf = validation_data.prefetch(buffer_size=AUTOTUNE)
test_data_pf = test_data.prefetch(buffer_size=AUTOTUNE)

In [None]:
# Calculate class weights to balance data
unique_classes = metadata.Label.unique()
all_rows = metadata.Label.to_numpy()
weights = compute_class_weight('balanced', classes=unique_classes, y=all_rows)

In [None]:
# set initial output bias for model to speed up convergence
initial_bias = Constant(weights[1] / weights[0])

In [None]:
densenet_model = DenseNet201(
    include_top=False,
    weights='imagenet',
    input_shape=input_shape
)

densenet_model.trainable = False
densenet_model.summary()

In [None]:
def create_model():
    model = Sequential()
    model.add(data_augmentation)
    model.add(densenet_model)
    model.add(GlobalAveragePooling2D())
    model.add(Flatten())
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid', bias_initializer=initial_bias))
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

In [None]:
model = create_model()

In [None]:
checkpoint = ModelCheckpoint(
    'model checkpoints/',
    monitor='val_loss',
    save_best_only=True,
    save_weights_only=True
)

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=1
)

In [None]:
history = model.fit(
    train_data,
    batch_size=50,
    callbacks=[checkpoint, early_stopping],
    epochs=10,
    validation_data=validation_data,
    verbose=1
)

In [None]:
plt.figure(figsize=(8,5))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training loss', 'validation loss'])
plt.xlabel('epoch')
plt.ylabel('loss')
plt.title('Training and validation loss')
plt.show()

In [None]:
model.load_weights('model checkpoints/')

In [None]:
y_proba = model.predict(test_data)

In [None]:
y_pred = np.where(y_proba >= 0.5, 1, 0)

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
y_pred.mean()