In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import shutil
import os
import matplotlib.pyplot as plt
import cv2
import random
from PIL import Image
import seaborn as sns

from sklearn.metrics import confusion_matrix, classification_report
from sklearn.utils.class_weight import compute_class_weight


In [None]:
def get_filepaths(directory):
    """
    This function will generate the file names in a directory
    tree by walking the tree either top-down or bottom-up. For each
    directory in the tree rooted at directory top (including top itself),
    it yields a 3-tuple (dirpath, dirnames, filenames).
    """
    file_paths = []
    for root, directories, files in os.walk(directory):
        for filename in files:
            filepath = os.path.join(root, filename)
            file_paths.append(filepath) if filepath.endswith(".jpeg") else None
    return file_paths
full_file_paths = get_filepaths('/kaggle/input/chest-xray-pneumonia')

In [None]:
data = pd.DataFrame(full_file_paths, columns=['paths'])

In [None]:
data['case'] = ''
data['split'] = ''
for i in range(len(data['paths'])):
    path = os.path.split(os.path.split(data.iloc[i]['paths'])[0])
    data['case'][i]  = path[1]
    data['split'][i] =  os.path.split(path[0])[1]

In [None]:
train_df = data[data['split'] == 'train'].reset_index(drop=True)
test_df  = data[data['split'] == 'test' ].reset_index(drop=True)
val_df   = data[data['split'] == 'val'  ].reset_index(drop=True)

In [None]:
def image_viewer(dataset, index, ax):
    image_path =  dataset['paths'][index]
    image      =  Image.open(image_path)
    ax.imshow(image)
def plot_some_images(dataset, title):
    fig, axs = plt.subplots(nrows = 4,ncols = 10,figsize=(20,8))
    for ind, ax in enumerate(axs.flat):
            index = random.randrange(len(dataset))
            image_viewer(dataset, index, ax)
            ax.set_title(dataset['case'][index], fontsize = 8)
            ax.axis('off')
            fig.suptitle(title, fontsize = 15)
    plt.show()

In [None]:
train_df

In [None]:
plot_some_images(train_df, 'Trainig Cest X-Ray Images')

In [None]:
val_df

In [None]:
plot_some_images(val_df, 'Validation Cest X-Ray Images')

In [None]:
test_df

In [None]:
plot_some_images(test_df, 'Testing Cest X-Ray Images')

In [None]:
class_weights = compute_class_weight(class_weight = "balanced",
                                     classes= np.unique(train_df['case']),
                                     y= train_df['case'])

classes = (np.unique(train_df['case']))
class_weights_forplot = dict(zip(classes, class_weights))

In [None]:
class_weights = dict(zip(range(43), class_weights))

In [None]:
train_generator = tf.keras.preprocessing.image.ImageDataGenerator(
    preprocessing_function=tf.keras.applications.efficientnet.preprocess_input,
)
test_generator = tf.keras.preprocessing.image.ImageDataGenerator(
    preprocessing_function=tf.keras.applications.efficientnet.preprocess_input
)

In [None]:
train_images = train_generator.flow_from_dataframe(
    dataframe=train_df,
    x_col='paths',
    y_col= 'case',
    target_size=(256, 256),
    color_mode='grayscale',
    class_mode="binary",
    batch_size=64,
    shuffle=True,
    seed=210,
)
val_images = train_generator.flow_from_dataframe(
    dataframe=val_df,
    x_col='paths',
    y_col= 'case',
    target_size=(256, 256),
    color_mode='grayscale',
    batch_size=64,
    class_mode="binary",
    shuffle=True,
    seed=210,
)
test_images = test_generator.flow_from_dataframe(
    dataframe=test_df,
    x_col='paths',
    y_col= 'case',
    target_size=(256, 256),
    class_mode="binary",
    color_mode='grayscale',
    batch_size=64,
    shuffle=False
)

In [None]:
trans_arc = tf.keras.applications.EfficientNetB0(weights = "imagenet", include_top = False,
                         input_shape=(256, 256,3), pooling='max')
for l in trans_arc.layers:
    l.trainable = False
inputs = trans_arc.input
flatten = trans_arc.output

x = tf.keras.layers.Dense(256, activation='relu')(flatten)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dropout(0.3)(x)

x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.BatchNormalization()(x)

outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)


model = tf.keras.Model(inputs=inputs, outputs=outputs)


In [None]:
model.summary()

In [None]:
#os.mkdir("/content/checkpoints")
cb_csvlogger = tf.keras.callbacks.CSVLogger(
                                            filename='/content/checkpoints/training_log2.csv',
                                            separator=',',
                                            append=False)

In [None]:
loss = [tf.keras.losses.binary_crossentropy]

initial_learning_rate = 0.005

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=82,
    decay_rate=0.9,
    staircase=True)

optimizer = tf.keras.optimizers.Adam(
    learning_rate= lr_schedule,
    beta_1=0.9,
    beta_2=0.999,
    epsilon=1e-07,
)
metrics= ['accuracy']

model.compile(
    optimizer=optimizer,
    loss= loss,
    metrics=metrics
    )

In [None]:
history = model.fit(
      train_images,
      validation_data=val_images,
      verbose = True,
      epochs=100,
      class_weight = class_weights,
      callbacks=[
          tf.keras.callbacks.LearningRateScheduler(lr_schedule),
          tf.keras.callbacks.EarlyStopping(
              monitor='val_loss',
              patience=10,
              restore_best_weights=True),
          cb_csvlogger
      ]
  )

In [None]:
# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])  # RAISE ERROR
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss']) #RAISE ERROR
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
def plot_model_evaluation(model, test_data, n_classes, target_labels):

    results = model.evaluate(test_data, verbose=0)
    loss = results[0]
    acc = results[1]

    print("    Test Loss: {:.5f}".format(loss))
    print("Test Accuracy: {:.2f}%".format(acc * 100))

    y_pred = np.squeeze((model.predict(test_data) >= 0.5).astype(int))
    cm = confusion_matrix(test_data.labels, y_pred)
    clr = classification_report(test_data.labels, y_pred, target_names=target_labels)

    plt.figure(figsize=(15, 15))
    sns.heatmap(cm, annot=True, fmt='g', vmin=0, cmap='Blues', cbar=False)
    plt.xticks(ticks=np.arange(n_classes) + 0.5, labels=list(test_data.class_indices.keys()), rotation=90)
    plt.yticks(ticks=np.arange(n_classes) + 0.5, labels=list(test_data.class_indices.keys()), rotation=0)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

    print("Classification Report:\n----------------------\n", clr)

In [None]:
plot_model_evaluation(model, test_images, 2, test_df['case'].unique())