In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from pathlib import Path
from sklearn.metrics import roc_auc_score
import numpy as np
import os
import cv2
import warnings
warnings.filterwarnings("ignore")
# IMPORT KERAS LIBRARY
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from keras.preprocessing import image
from keras.models import Model
from keras.layers import Dense, Input, GlobalAveragePooling2D, MaxPooling2D, Flatten, BatchNormalization, Dropout
from keras.callbacks import ModelCheckpoint
from keras import backend as K
import tensorflow as tf
%load_ext autoreload
%autoreload
# vit architecture
%pip install vit-keras
# https://github.com/faustomorales/vit-keras
%pip install tensorflow-addons

In [None]:
model_path='.'
path=r'D:\Datasets\CheXpert-v1.0-small'
train_folder=f'{path}train'
test_folder=f'{path}test'
train_lbl=f'{path}train.csv'

In [None]:
chestxrays_root = Path(path)
data_path = chestxrays_root

### Load Data and the targets 

In [None]:
full_train_df = pd.read_csv(data_path/'CheXpert-v1.0-small/train.csv')
full_valid_df = pd.read_csv(data_path/'CheXpert-v1.0-small/valid.csv')

In [None]:
chexpert_targets = ['No Finding',
       'Enlarged Cardiomediastinum', 'Cardiomegaly', 'Lung Opacity',
       'Lung Lesion', 'Edema', 'Consolidation', 'Pneumonia', 'Atelectasis',
       'Pneumothorax', 'Pleural Effusion', 'Pleural Other', 'Fracture',
       'Support Devices']

#chexpert_targets = ['Atelectasis', 'Cardiomegaly', 'Consolidation', 'Edema', 'Pleural Effusion']

In [None]:
full_train_df.head()

### Uncertainty Approaches

The CheXpert paper outlines several different approaches to mapping using the uncertainty labels in the data:

- Ignoring - essentially removing from the calculation in the loss function
- Binary mapping - sending uncertain values to either 0 or 1
- Prevalence mapping - use the rate of prevelance of the feature as it's target value
- Self-training - consider the uncertain values as unlabeled
- 3-Class Classification - retain a separate value for uncertain and try to predict it as a class in its own right


The paper gives the results of different experiments with the above approaches and indicates the most accurate approach for each feature.
    
|Approach/Feature|Atelectasis|Cardiomegaly|Consolidation|Edema|PleuralEffusion|
|-----------|-----------|-----------|-----------|-----------|-----------|
|`U-Ignore`|0.818(0.759,0.877)|0.828(0.769,0.888)|0.938(0.905,0.970)|0.934(0.893,0.975)|0.928(0.894,0.962)|
|`U-Zeros`|0.811(0.751,0.872)|0.840(0.783,0.897)|0.932(0.898,0.966)|0.929(0.888,0.970)|0.931(0.897,0.965)|
|`U-Ones`|**0.858(0.806,0.910)**|0.832(0.773,0.890)|0.899(0.854,0.944)|0.941(0.903,0.980)|0.934(0.901,0.967)|
|`U-Mean`|0.821(0.762,0.879)|0.832(0.771,0.892)|0.937(0.905,0.969)|0.939(0.902,0.975)|0.930(0.896,0.965)|
|`U-SelfTrained`|0.833(0.776,0.890)|0.831(0.770,0.891)|0.939(0.908,0.971)|0.935(0.896,0.974)|0.932(0.899,0.966)|
|`U-MultiClass`|0.821(0.763,0.879)|**0.854(0.800,0.909)**|0.937(0.905,0.969)|0.928(0.887,0.968)|0.936(0.904,0.967)|

The binary mapping approaches (U-Ones and U-Zeros) are easiest to implement and so to begin with we take the best option between U-Ones and U-Zeros for each feature

- Atelectasis `U-Ones`
- Cardiomegaly `U-Zeros`
- Consolidation `U-Zeros`
- Edema `U-Ones`
- Pleural Effusion `U-Zeros`

In [None]:
u_one_features = ['Atelectasis', 'Edema']
u_zero_features = ['Cardiomegaly', 'Consolidation', 'Pleural Effusion']

In [None]:
def feature_string(row):
    feature_list = []
    for feature in u_one_features:
        if row[feature] in [-1,1]:
            feature_list.append(feature)
            
    for feature in u_zero_features:
        if row[feature] == 1:
            feature_list.append(feature)
            
    return ';'.join(feature_list)
            
     

In [None]:
full_train_df['train_valid'] = False
full_valid_df['train_valid'] = True

### Create patient and study columns

In [None]:
full_train_df['patient'] = full_train_df.Path.str.split('/',3,True)[2]
full_train_df  ['study'] = full_train_df.Path.str.split('/',4,True)[3]

full_valid_df['patient'] = full_valid_df.Path.str.split('/',3,True)[2]
full_valid_df  ['study'] = full_valid_df.Path.str.split('/',4,True)[3]

This is to get the number of study and the number of patient for each entry of the dataset

In [None]:
full_df = pd.concat([full_train_df, full_valid_df])
full_df.head()

Create a feature string containing all the classes and omiting the unknowns

In [None]:
full_df['feature_string'] = full_df.apply(feature_string,axis = 1).fillna('')
full_df['feature_string'] =full_df['feature_string'] .apply(lambda x:x.split(";"))
full_df.head()

# Show some images

In [None]:
#get the first 5 whale images
paths =  full_df.Path[:3]
labels = full_df.feature_string[:3]

fig, m_axs = plt.subplots(1, len(labels), figsize = (20, 10))
#show the images and label them
for ii, c_ax in enumerate(m_axs):
    c_ax.imshow(cv2.imread(os.path.join(data_path,paths[ii])))
    c_ax.set_title(labels[ii])

## Set up the dataframe for training: 
* Separate the validation and the training samples
* Take a unique sample per patient
* Get a samle of 5% of the dataset

In [None]:
sample_perc = 0.00
train_only_df = full_df[~full_df.train_valid]
valid_only_df = full_df[full_df.train_valid]
unique_patients = train_only_df.patient.unique()
mask = np.random.rand(len(unique_patients)) <= sample_perc
sample_patients = unique_patients[mask]

dev_df = train_only_df[full_train_df.patient.isin(sample_patients)]
train_df = train_only_df[~full_train_df.patient.isin(sample_patients)]

print(valid_only_df.Path.size)
print(train_df.Path.size)

# Set up data generation

In [None]:
datagen=image.ImageDataGenerator(rescale=1./255, 
                                 featurewise_center=True,
                                 featurewise_std_normalization=True,
                                 rotation_range=5,
                                 width_shift_range=0.2,
                                 height_shift_range=0.2,
                                 horizontal_flip=True,
                                 validation_split = 0.1)
test_datagen=image.ImageDataGenerator(rescale=1./255)

# use this code to get a subset of dataset otherwise just use the whole dataset, comment this cell only

In [None]:
# to use a subset of your dataset, uncomment if you want to use full dataset, this is for testing only
# Limit the number of images for training
train_df = train_df[:1000]
# If you have a separate DataFrame for testing, limit it as well
valid_only_df = valid_only_df[:100]
print(valid_only_df.Path.size)
print(train_df.Path.size)

# Remaining code

In [None]:
def generate_datasets(image_size = 224):
# change batchsize here according to requirements
    train_generator=datagen.flow_from_dataframe(dataframe=train_df, directory=data_path, 
                                                x_col="Path", y_col="feature_string", has_ext=True, seed = 42, classes = chexpert_targets,
                                                class_mode="categorical", target_size=(image_size,image_size), batch_size=20, subset = "training")

    validation_generator = datagen.flow_from_dataframe(dataframe=train_df, directory=data_path, 
                                                       x_col="Path", y_col="feature_string", has_ext=True, seed = 42, classes = chexpert_targets,
                                                       class_mode="categorical", target_size=(image_size,image_size), batch_size=20, subset = "validation")

    test_generator = test_datagen.flow_from_dataframe(dataframe=valid_only_df, directory=data_path, 
                                                      target_size=(image_size,image_size),class_mode='categorical',
                                                      batch_size=1, shuffle=False, classes = chexpert_targets,
                                                      x_col="Path", y_col="feature_string")
    
    return [train_generator,validation_generator,test_generator]

# Build the model

In [None]:
from keras.callbacks import *

class CyclicLR(Callback):
    
    def __init__(self, base_lr=0.001, max_lr=0.01, step_size=2000., mode='triangular',
                 gamma=1., scale_fn=None, scale_mode='cycle'):
        super(CyclicLR, self).__init__()

        self.base_lr = base_lr
        self.max_lr = max_lr
        self.step_size = step_size
        self.mode = mode
        self.gamma = gamma
        if scale_fn == None:
            if self.mode == 'triangular':
                self.scale_fn = lambda x: 1.
                self.scale_mode = 'cycle'
            elif self.mode == 'triangular2':
                self.scale_fn = lambda x: 1/(2.**(x-1))
                self.scale_mode = 'cycle'
            elif self.mode == 'exp_range':
                self.scale_fn = lambda x: gamma**(x)
                self.scale_mode = 'iterations'
        else:
            self.scale_fn = scale_fn
            self.scale_mode = scale_mode
        self.clr_iterations = 0.
        self.trn_iterations = 0.
        self.history = {}

        self._reset()

    def _reset(self, new_base_lr=None, new_max_lr=None,
               new_step_size=None):
        """Resets cycle iterations.
        Optional boundary/step size adjustment.
        """
        if new_base_lr != None:
            self.base_lr = new_base_lr
        if new_max_lr != None:
            self.max_lr = new_max_lr
        if new_step_size != None:
            self.step_size = new_step_size
        self.clr_iterations = 0.
        
    def clr(self):
        cycle = np.floor(1+self.clr_iterations/(2*self.step_size))
        x = np.abs(self.clr_iterations/self.step_size - 2*cycle + 1)
        if self.scale_mode == 'cycle':
            return self.base_lr + (self.max_lr-self.base_lr)*np.maximum(0, (1-x))*self.scale_fn(cycle)
        else:
            return self.base_lr + (self.max_lr-self.base_lr)*np.maximum(0, (1-x))*self.scale_fn(self.clr_iterations)
        
    def on_train_begin(self, logs={}):
        logs = logs or {}

        if self.clr_iterations == 0:
            K.set_value(self.model.optimizer.lr, self.base_lr)
        else:
            K.set_value(self.model.optimizer.lr, self.clr())        
            
    def on_batch_end(self, epoch, logs=None):
        
        logs = logs or {}
        self.trn_iterations += 1
        self.clr_iterations += 1

        self.history.setdefault('lr', []).append(K.get_value(self.model.optimizer.lr))
        self.history.setdefault('iterations', []).append(self.trn_iterations)

        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)
        
        K.set_value(self.model.optimizer.lr, self.clr())

Define the metrics

In [None]:
def auc(y_true, y_pred):
    auc = tf.metrics.auc(y_true, y_pred)[1]
    K.get_session().run(tf.local_variables_initializer())
    return auc
def f1_score(y_true, y_pred):
    precision = tf.metrics.precision(y_true, y_pred)[1]
    recall = tf.metrics.recall(y_true, y_pred)[1]
    f1 = 2 * ((precision * recall) / (precision + recall + K.epsilon()))
    
    K.get_session().run(tf.local_variables_initializer())
    return f1

In [None]:
from vit_keras import vit
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, GlobalAveragePooling2D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
import tensorflow_addons as tfa

def build_model(image_size = 224):
    vit_model = vit.vit_b16(
        image_size = image_size,
        activation = 'sigmoid',
        pretrained = True,
        include_top = False,
        pretrained_top = False,
    )
    
    x = vit_model.output
    x = Flatten()(x)
    x = BatchNormalization()(x)
    x = Dense(128, activation=tfa.activations.gelu)(x)
    x = BatchNormalization()(x)
    x = Dense(64, activation=tfa.activations.gelu)(x)
    x = Dense(32, activation=tfa.activations.gelu)(x)
    predictions = Dense(14, activation='softmax')(x)  # number of classes are defined (14, in this case.)
    
    model = Model(inputs=vit_model.input, outputs=predictions)
    
    adamOptimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=10**-8, amsgrad=False)
    model.compile(optimizer=adamOptimizer, loss='categorical_crossentropy', metrics=['accuracy','AUC'])
    return model

In [None]:
def train_model(clr_triangular, model , datasets, epochs=1, image_size = 224):
    
    checkpointer = ModelCheckpoint(filepath='weights.hdf5', 
                                   verbose=1, save_best_only=True)
    
    train_generator,validation_generator,test_generator = datasets
    
    STEP_SIZE_TRAIN=train_generator.n//train_generator.batch_size
    STEP_SIZE_VALID=validation_generator.n//validation_generator.batch_size
    print(STEP_SIZE_TRAIN)
    print(STEP_SIZE_VALID)
    #return model
    return model.fit_generator(generator=train_generator,
                        steps_per_epoch=STEP_SIZE_TRAIN,
                        validation_data=validation_generator,
                        validation_steps=STEP_SIZE_VALID,
                        epochs=epochs, callbacks = [clr_triangular, checkpointer])

In [None]:
image_size_input = 224
model = build_model(image_size = image_size_input)

In [None]:
datasets = generate_datasets(image_size = image_size_input)
train_generator,validation_generator,test_generator = datasets

In [None]:
# change number of epochs here if you want to train for more epochs
clr_triangular = CyclicLR(mode='exp_range')
history = train_model(clr_triangular, model , datasets, epochs=1, image_size = image_size_input) # epochs=50 or more for better results

In [None]:
clr_triangular.history

# Learning Curves

In [None]:
history.history

In [None]:
# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
from sklearn.preprocessing import MultiLabelBinarizer
test = pd.Series(test_generator.labels)
mlb = MultiLabelBinarizer()
y_labels = mlb.fit_transform(test)

In [None]:
test_generator.reset()
y_pred_keras = model.predict_generator(test_generator,verbose = 1,steps=test_generator.n)

In [None]:
from sklearn.metrics import roc_curve
from sklearn.metrics import auc

plt.figure(1,figsize=(10,10))
plt.plot([0, 1], [0, 1], 'k--')

#for ii in range(1,y_pred_keras.shape[1]):
for ii in range(1,5):
    fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_labels[:,ii], y_pred_keras[:,ii])
    auc_keras = auc(fpr_keras, tpr_keras)
    plt.plot(fpr_keras, tpr_keras, label=chexpert_targets[ii-1] + '(area = {:.3f})'.format(auc_keras))
    
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
   
