<a href="https://colab.research.google.com/github/zpovsic/PT-SideDrawer-Header/blob/master/covid.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:

# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive') #, force_remount=True)


In [0]:
# After executing the cell above, Drive
# files will be present in "/content/drive/My Drive".
!ls "/content/drive/My Drive/Covid-xray-data"

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

import matplotlib.pylab as plt
import pandas as pd
import os
import random
import dill
from imblearn.over_sampling import RandomOverSampler
import numpy as np
import datetime
import pathlib
import shutil
import cv2
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from math import ceil

!pip install -U PyYAML
import yaml

!pip install -U pillow
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

try:
  # %tensorflow_version only exists in Colab.
  !pip install tf-nightly
except Exception:
  pass

In [0]:
from tensorflow import keras
from tensorflow.keras.metrics import Metric, Precision, Recall
from tensorflow.python.keras.utils import metrics_utils
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.keras.utils.generic_utils import to_list
from tensorflow.keras.metrics import BinaryAccuracy, CategoricalAccuracy, Precision, Recall, AUC
from tensorflow.keras.models import save_model
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorboard.plugins.hparams import api as hp
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Input, MaxPool2D, Conv2D, Flatten, LeakyReLU, BatchNormalization, \
    concatenate
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.initializers import Constant
from tensorflow.python.keras import backend as K

In [0]:
#!pip list

In [0]:
CONFIG_PATH = "/content/drive/My Drive/Covid-xray-data/config.yml"
PATH = "/content/drive/My Drive/Covid-xray-data/"
DATA_PATH = "/content/drive/My Drive/Covid-xray-data/data/"




# **Train**


**Remove text**

In [0]:
def remove_text(img):
    """
    Attempts to remove textual artifacts from X-ray images. For example, many images indicate the right side of the
    body with a white 'R'. Works only for very bright text.
    :param img: Numpy array of image
    :return: Array of image with (ideally) any characters removed and inpainted
    """
    mask = cv2.threshold(img, 230, 255, cv2.THRESH_BINARY)[1][:, :, 0].astype(np.uint8)
    img = img.astype(np.uint8)
    result = cv2.inpaint(img, mask, 10, cv2.INPAINT_NS).astype(np.float32)
    return result


**Get class weights**

In [0]:
def get_class_weights(histogram, class_multiplier=None):
    """
    Computes weights for each class to be applied in the loss function during training.
    :param histogram: A list depicting the number of each item in different class
    :param class_multiplier: List of values to multiply the calculated class weights by. For further control of class weighting.
    :return: A dictionary containing weights for each class
    """
    weights = [None] * len(histogram)
    for i in range(len(histogram)):
        weights[i] = (1.0 / len(histogram)) * sum(histogram) / histogram[i]
    class_weight = {i: weights[i] for i in range(len(histogram))}
    if class_multiplier is not None:
        class_weight = [class_weight[i] * class_multiplier[i] for i in range(len(histogram))]
    
    class_weight_dict = dict(enumerate(class_weight))
    print("Class weights: ", class_weight, "Class_weight_dict: ", class_weight_dict)
    return class_weight_dict


**Class F1score**

In [0]:
class F1Score(Metric):
    """
    Custom tf.keras metric that calculates the F1 Score
    """

    def __init__(self, thresholds=None, top_k=None, class_id=None, name=None, dtype=None):
        """
        Creates an instance of the  F1Score class
        :param thresholds: A float value or a python list/tuple of float threshold values in [0, 1].
        :param top_k: An int value specifying the top-k predictions to consider when calculating precision
        :param class_id: Integer class ID for which we want binary metrics. This must be in the half-open interval
                `[0, num_classes)`, where `num_classes` is the last dimension of predictions
        :param name: string name of the metric instance
        :param dtype: data type of the metric result
        """
        super(F1Score, self).__init__(name=name, dtype=dtype)
        self.init_thresholds = thresholds
        self.top_k = top_k
        self.class_id = class_id

        default_threshold = 0.5 if top_k is None else metrics_utils.NEG_INF
        self.thresholds = metrics_utils.parse_init_thresholds(
            thresholds, default_threshold=default_threshold)
        self.true_positives = self.add_weight('true_positives', shape=(len(self.thresholds),),
                                              initializer=init_ops.zeros_initializer)
        self.false_positives = self.add_weight('false_positives', shape=(len(self.thresholds),),
                                               initializer=init_ops.zeros_initializer)
        self.false_negatives = self.add_weight('false_negatives', shape=(len(self.thresholds),),
                                               initializer=init_ops.zeros_initializer)

    def update_state(self, y_true, y_pred, sample_weight=None):
        """
        Accumulates true positive, false positive and false negative statistics.
        :param y_true: The ground truth values, with the same dimensions as `y_pred`. Will be cast to `bool`
        :param y_pred: The predicted values. Each element must be in the range `[0, 1]`
        :param sample_weight: Weighting of each example. Defaults to 1. Can be a `Tensor` whose rank is either 0,
               or the same rank as `y_true`, and must be broadcastable to `y_true`
        :return: Update operation
        """
        metrics_utils.update_confusion_matrix_variables(
            {
                metrics_utils.ConfusionMatrix.TRUE_POSITIVES: self.true_positives,
                metrics_utils.ConfusionMatrix.FALSE_POSITIVES: self.false_positives,
                metrics_utils.ConfusionMatrix.FALSE_NEGATIVES: self.false_negatives
            },
            y_true, y_pred, thresholds=self.thresholds, top_k=self.top_k, class_id=self.class_id,
            sample_weight=sample_weight)

    def result(self):
        """
        Compute the value for the F1 score. Calculates precision and recall, then F1 score.
        F1 = 2 * precision * recall / (precision + recall)
        :return: F1 score
        """
        precision = math_ops.div_no_nan(self.true_positives, self.true_positives + self.false_positives)
        recall = math_ops.div_no_nan(self.true_positives, self.true_positives + self.false_negatives)
        result = math_ops.div_no_nan(2 * precision * recall, precision + recall)
        return result[0] if len(self.thresholds) == 1 else result

    def reset_states(self):
        """
        Resets all of the metric state variables. Called between epochs, when a metric is evaluated during training.
        """
        num_thresholds = len(to_list(self.thresholds))
        K.batch_set_value(
            [(v, np.zeros((num_thresholds,))) for v in self.variables])

    def get_config(self):
        """
        Returns the serializable config of the metric.
        :return: serializable config of the metric
        """
        config = {
            'thresholds': self.init_thresholds,
            'top_k': self.top_k,
            'class_id': self.class_id
        }
        base_config = super(F1Score, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

DCCN resnet model

In [0]:
def dcnn_resnet(model_config, input_shape, metrics, n_classes=2, output_bias=None):
    """
    Defines a deep convolutional neural network model for multiclass X-ray classification.
    :param model_config: A dictionary of parameters associated with the model architecture
    :param input_shape: The shape of the model input
    :param metrics: Metrics to track model's performance
    :param n_classes: Number of classes
    :param output_bias: Output bias
    :return: a Keras Sequential model object with the architecture defined in this method
    """

    # Set hyperparameters
    nodes_dense0 = model_config['NODES_DENSE0']
    lr = model_config['LR']
    dropout = model_config['DROPOUT']
    l2_lambda = model_config['L2_LAMBDA']
    if model_config['OPTIMIZER'] == 'adam':
        optimizer = Adam(learning_rate=lr)
    elif model_config['OPTIMIZER'] == 'sgd':
        optimizer = SGD(learning_rate=lr)
    else:
        optimizer = Adam(learning_rate=lr)  # For now, Adam is default option
    init_filters = model_config['INIT_FILTERS']
    filter_exp_base = model_config['FILTER_EXP_BASE']
    conv_blocks = model_config['CONV_BLOCKS']
    kernel_size = eval(model_config['KERNEL_SIZE'])
    max_pool_size = eval(model_config['MAXPOOL_SIZE'])
    strides = eval(model_config['STRIDES'])

    # Set output bias
    if output_bias is not None:
        output_bias = Constant(output_bias)
    print("MODEL CONFIG: ", model_config)

    # Input layer
    X_input = Input(input_shape)
    X = X_input

    # Add convolutional (residual) blocks
    for i in range(conv_blocks):
        X_res = X
        X = Conv2D(init_filters * (filter_exp_base ** i), kernel_size, strides=strides, padding='same',
                   kernel_initializer='he_uniform', activity_regularizer=l2(l2_lambda))(X)
        X = BatchNormalization()(X)
        X = LeakyReLU()(X)
        X = Conv2D(init_filters * (filter_exp_base ** i), kernel_size, strides=strides, padding='same',
                   kernel_initializer='he_uniform', activity_regularizer=l2(l2_lambda))(X)
        X = concatenate([X, X_res])
        X = BatchNormalization()(X)
        X = LeakyReLU()(X)
        X = MaxPool2D(max_pool_size, padding='same')(X)

    # Add fully connected layers
    X = Flatten()(X)
    X = Dropout(dropout)(X)
    X = Dense(nodes_dense0, kernel_initializer='he_uniform', activity_regularizer=l2(l2_lambda))(X)
    X = LeakyReLU()(X)
    Y = Dense(n_classes, activation='softmax', bias_initializer=output_bias, name='output')(X)

    # Set model loss function, optimizer, metrics.
    model = Model(inputs=X_input, outputs=Y)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=metrics)
    model.summary()
    return model


**Train model**

In [0]:
def train_model(config, data, callbacks, verbose=1):
    """
    Train a and evaluate model on given data.
    :param config: Project config (from config.yml)
    :param data: dict of partitioned dataset
    :param callbacks: list of callbacks for Keras model
    :param verbose: Verbosity mode to pass to model.fit()
    :return: Trained model and associated performance metrics on the test set
    """

    # If set in config file, oversample the minority class
    if config['TRAIN']['IMB_STRATEGY'] == 'random_oversample':
        data['TRAIN'] = random_minority_oversample(data['TRAIN'])

    # Create ImageDataGenerators
    train_img_gen = ImageDataGenerator(rotation_range=10, preprocessing_function=remove_text,
                                       samplewise_std_normalization=True, samplewise_center=True)
    val_img_gen = ImageDataGenerator(preprocessing_function=remove_text,
                                     samplewise_std_normalization=True, samplewise_center=True)
    test_img_gen = ImageDataGenerator(preprocessing_function=remove_text,
                                      samplewise_std_normalization=True, samplewise_center=True)

    # Create DataFrameIterators
    img_shape = tuple(config['DATA']['IMG_DIM'])
    x_col = 'filename'
    y_col = 'label_str'
    class_mode = 'categorical'
    train_generator = train_img_gen.flow_from_dataframe(dataframe=data['TRAIN'], directory=PATH + "data/train/",
                                                        x_col=x_col, y_col=y_col, has_ext=True, target_size=img_shape,
                                                        batch_size=config['TRAIN']['BATCH_SIZE'], class_mode=class_mode)
    val_generator = val_img_gen.flow_from_dataframe(dataframe=data['VAL'], directory=PATH + "data/val/",
                                                    x_col="filename", y_col=y_col, target_size=img_shape,
                                                    batch_size=config['TRAIN']['BATCH_SIZE'], class_mode=class_mode)
    test_generator = test_img_gen.flow_from_dataframe(dataframe=data['TEST'], directory=PATH + "data/test/",
                                                      x_col="filename", y_col=y_col, target_size=img_shape,
                                                      batch_size=config['TRAIN']['BATCH_SIZE'], class_mode=class_mode,
                                                      shuffle=False)

    # Save model's ordering of class indices
    dill.dump(test_generator.class_indices, open(PATH + 'data/interpretability/OUTPUT_CLASS_INDICES', 'wb'))

    # Apply class imbalance strategy. We have many more X-rays negative for COVID-19 than positive.
    histogram = np.bincount(np.array(train_generator.labels).astype(int))  # Get class distribution
    class_weight = None
    if config['TRAIN']['IMB_STRATEGY'] == 'class_weight':
        class_multiplier = config['TRAIN']['CLASS_MULTIPLIER']
        class_multiplier = [class_multiplier[config['DATA']['CLASSES'].index(c)] for c in test_generator.class_indices]
        class_weight = get_class_weights(histogram, class_multiplier)

    # Define metrics.
    covid_class_idx = test_generator.class_indices['COVID-19']  # Get index of COVID-19 class
    thresholds = 1.0 / len(config['DATA']['CLASSES'])  # Binary classification threshold for a class
    metrics = ['accuracy', CategoricalAccuracy(name='categorical_accuracy'),
               Precision(name='precision', thresholds=thresholds, class_id=covid_class_idx),
               Recall(name='recall', thresholds=thresholds, class_id=covid_class_idx),
               AUC(name='auc'),F1Score(name='f1score', thresholds=thresholds, class_id=covid_class_idx)] # some error try latter!!!!!!!!

    # Define the model.
    print('Training distribution: ',
          ['Class ' + list(test_generator.class_indices.keys())[i] + ': ' + str(histogram[i]) + '. '
           for i in range(len(histogram))])
    input_shape = config['DATA']['IMG_DIM'] + [3]
    if config['TRAIN']['CLASS_MODE'] == 'binary':
        histogram = np.bincount(data['TRAIN']['label'].astype(int))
        output_bias = np.log([histogram[i] / (np.sum(histogram) - histogram[i]) for i in range(histogram.shape[0])])
        model = dcnn_resnet(config['NN']['DCNN_BINARY'], input_shape, metrics, 2, output_bias=output_bias)
    else:
        n_classes = len(config['DATA']['CLASSES'])
        histogram = np.bincount(data['TRAIN']['label'].astype(int))
        output_bias = np.log([histogram[i] / (np.sum(histogram) - histogram[i]) for i in range(histogram.shape[0])])
        model = dcnn_resnet(config['NN']['DCNN_MULTICLASS'], input_shape, metrics, n_classes, output_bias=output_bias)

    # Train the model.
    steps_per_epoch = ceil(train_generator.n / train_generator.batch_size)
    val_steps = ceil(val_generator.n / val_generator.batch_size)

    print('\n*** steps = ',steps_per_epoch, 
          '\n*** epochs = ', config['TRAIN']['EPOCHS'], 
          '\n*** valdata = ', val_generator, 
          '\n*** valsteps = ', val_steps, 
          "\n*** callbacks = ", callbacks, 
          "\n*** class_weight = ", class_weight)

    history = model.fit(train_generator,
                        steps_per_epoch=steps_per_epoch,
                        epochs=config['TRAIN']['EPOCHS'],
                        validation_data=val_generator,
                        validation_steps=val_steps,
                        callbacks=callbacks,
                        verbose=verbose,
                        class_weight=class_weight)  
    # Run the model on the test set and print the resulting performance metrics.
    test_results = model.evaluate(test_generator, verbose=1)
    test_metrics = {}
    test_summary_str = [['**Metric**', '**Value**']]
    for metric, value in zip(model.metrics_names, test_results):
        test_metrics[metric] = value
        print(metric, ' = ', value)
        test_summary_str.append([metric, str(value)])
    return model, test_metrics, test_generator


**Train experiment**

In [0]:
def train_experiment(experiment='single_train', save_weights=True, write_logs=True):
    """
    Defines and trains model. Prints and logs relevant metrics.
    :param experiment: The type of training experiment. Choices are {'single_train'}
    :param save_weights: A flag indicating whether to save the model weights
    :param write_logs: A flag indicating whether to write TensorBoard logs
    :return: A dictionary of metrics on the test set
    """
   # Load project config data
    config = yaml.full_load(open(PATH + '/config.yml', 'r'))

    # Set logs directory
    cur_date = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
    log_dir = PATH + "results\\logs\\training\\" + cur_date if write_logs else None

    # Load dataset file paths and labels
    data = {'TRAIN': pd.read_csv(PATH + "data/train_set.csv"), 'VAL': pd.read_csv(PATH + "data/val_set.csv"),
            'TEST': pd.read_csv(PATH + "data/test_set.csv")}

    # Set callbacks.
    early_stopping = EarlyStopping(monitor='val_loss', verbose=1, patience=config['TRAIN']['PATIENCE'], mode='min',
                                   restore_best_weights=True)
    callbacks = [early_stopping]

    # Conduct the desired train experiment
    if experiment == 'hparam_search':
        log_dir = PATH + "results\\logs\\hparam_search\\" + cur_date
        random_hparam_search(config, data, callbacks, log_dir)
    else:
        if experiment == 'multi_train':
            base_log_dir = PATH + "results\\logs\\training\\" if write_logs else None
            model, test_metrics, test_generator, cur_date = multi_train(config, data, callbacks, base_log_dir)
        else:
            if write_logs:
                tensorboard = TensorBoard(log_dir=log_dir, histogram_freq=1)
                callbacks.append(tensorboard)
            model, test_metrics, test_generator = train_model(config, data, callbacks)
            if write_logs:
                log_test_results(config, model, test_generator, test_metrics, log_dir)
        if save_weights:
            model_path = PATH + "results\\logs\\models\\model" + cur_date + '.h5'
            print(model_path)
            save_model(model, model_path)  # Save the model's weights
    return


In [0]:
config = yaml.full_load(open(CONFIG_PATH, 'r'))

train_experiment(experiment=config['TRAIN']['EXPERIMENT_TYPE'], save_weights=True, write_logs=True)
