<a href="https://colab.research.google.com/github/ondraperny/BI-BAP-2020/blob/master/Simple_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple model
---
### Code parts(description of colab cells):
**Imports** - contain all used imports with descriptions \
**Colab essentials** - when running code on Colab, mount Google Drive so data can be read from there. \
**Constants declaration** - declaring input parameters \
**Data augmentation** - defining functions for importing and augmenting data in Keras \
**Auxiliary functions** - any functions non-related to ML training, usually for graphical output \
**Model definition** - Defining keras models \
**Training model** - initialization and training model






## Imports


In [1]:
# For graphical outpus, images preview and training results plot representation
import matplotlib.pyplot as plt
import matplotlib.cm as cm

from tensorflow.keras import backend
# Generator used for loading data into Keras model
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Essentials for creating model architecture, model class, layers and optimizers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import he_uniform 
# Logger for logging results from training model
from tensorflow.keras.callbacks import CSVLogger, EarlyStopping, ModelCheckpoint

# System functions and path processing
import sys
import os

# Support functions for saving and loading model
from tensorflow.keras.models import load_model

## Colab essentials
Running code on Colab and local requires slightly different, prerequisites (as different paths, etc.)
For differentiating where code is run, variable IN_COLAB is used, if True then run is in Colab (so rest of code can reflect that)

In [2]:
# check if code run on colab or local, if in Colab then True
IN_COLAB = 'google.colab' in sys.modules

# if run in Colab, then mount to run Google Drive file system
if IN_COLAB:
  # Mount google drive
  from google.colab import drive
  drive.mount('/content/drive')



Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


##  Constants declaration


##### Constants that are expected to be directly changed (input parameters)

In [3]:
# path to the directory with input
PATH="/content/drive/My Drive/SKOLA/Bachelor_work/XR_HAND_roi_clahe/"

# constants declaration
IMG_SIZE=(224,224)
INPUT_SHAPE = (*IMG_SIZE, 3)
BATCH_SIZE = 32

NUMBER_CLASSES = 2
NUMBER_EPOCHS = 50

##### Deriving other constants from given values

In [None]:
def files_number(PATH):
  """find number of files(recursively) in given directory(path)"""
  total = 0
  for root, dirs, files in os.walk(PATH):
      total += len(files)
  return total

# if run on local machine, change path to correspond my local system paths
if not IN_COLAB:
  PATH = "G:"+PATH[14:].replace('/', '\\\\')

PATH_TRAIN = PATH + 'train'
PATH_VALID = PATH + 'valid'
# calculate number of steps per epoch and validation steps from input values
# and constants
train_img_num = files_number(PATH_TRAIN)
valid_img_num = files_number(PATH_VALID)

NUMBER_STEPS_PER_EPOCH = train_img_num // BATCH_SIZE
NUMBER_VALIDATION_STEPS = valid_img_num // BATCH_SIZE

print("Number of steps per epoch:", NUMBER_STEPS_PER_EPOCH)
print("Number of validation steps:", NUMBER_VALIDATION_STEPS)

## Data augmentation
Currently image augmentation will be done by parameters of ImageDataGenerator.
If in future this solution will be insufficient, I will change it.

In [None]:
def ImageDataGenerator_def():
  datagen = ImageDataGenerator(
    # randomly rotate images in the range (degrees, 0 to 180)
    rotation_range=10,
    # randomly shift images horizontally (fraction of total width)
    width_shift_range=0.15,
    # randomly shift images vertically` (fraction of total height)
    height_shift_range=0.15,
    # set range for random shear
    shear_range=0.01,
    # set range for random zoom
    zoom_range=0.04,
    # set mode for filling points outside the input boundaries
    fill_mode='reflect',
    # randomly flip images over horizontal axis
    horizontal_flip=True, 
    # set rescaling factor (applied before any other transformation)
    rescale=1. / 255,
    # randomly change brightness of picture 
    brightness_range=[0.7,1.2],
  )
  return datagen

def load_from_directory(dir_path, data_generator):
  '''Load images from directory while transforming data (based on parameters),
  contain other parameters for data augmentation'''
  batches = data_generator.flow_from_directory(
    # path to target directory from which data will be loaded
    dir_path,
    # resize all input images to IMG_SIZE
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle = True,
  )
  return batches

In [None]:
data_generator = ImageDataGenerator_def()

train_batches = load_from_directory(PATH_TRAIN, data_generator)
valid_batches = load_from_directory(PATH_VALID, ImageDataGenerator(rescale=1. / 255))

print("Found indices: ", end='')
print(train_batches.class_indices)

## Auxiliary functions

In [None]:
def label_to_string(label):
  '''Map label value to descriptive string'''
  if(label == 0):
    return "Negative"
  else:
    return "Positive"


def show_sample_images(batches):
  '''Show one batch of training images (max 25 images)'''
  image_batch, label_batch = batches.next()
  plt.figure(figsize=(10,10))
  for n in range(min(len(image_batch), 25)):
    ax = plt.subplot(5,5,n+1)
    plt.imshow(image_batch[n][:,:,0], cmap=cm.gray)
    plt.title(label_to_string(label_batch[n]))
    plt.axis('off')
  plt.show()


def load_keras_model(PATH_TO_MODEL):
  '''Return loaded model saved model, based on value of PATH'''
  return load_model(PATH_TO_MODEL)


def plot_model_graph(model):
  '''Print to output and save to file the graph of model layers
  and its parameters, based on value of PATH'''
  plot_model(model, to_file=PATH+"model_arch.png", show_shapes=True)


def plot_training(history):
  '''Plot diagnostic learning curves, print them to output and save them
  to file'''
  # plot loss
  plt.figure(figsize=(10,10))
  plt.subplot(211)
  plt.plot(history.history['accuracy'])
  plt.plot(history.history['val_accuracy'])
  plt.title('Model accuracy')
  plt.ylabel('Accuracy')
  plt.xlabel('Epoch')
  plt.legend(['Train', 'Test'], loc='upper left')
  plt.ylim(bottom=0.4, top=0.8)
  plt.xlim(left=0, right = NUMBER_EPOCHS - 1)
  
  # plot accuracy
  plt.subplot(212)
  plt.plot(history.history['loss'])
  plt.plot(history.history['val_loss'])
  plt.title('Model loss')
  plt.ylabel('Loss')
  plt.xlabel('Epoch')
  plt.legend(['Train', 'Test'], loc='upper left')
  plt.ylim(bottom=0.5, top=1)
  plt.xlim(left=0, right = NUMBER_EPOCHS - 1)
  plt.tight_layout(pad=1.0)

  # save plot to file
  plt.savefig(PATH + 'acc_loss_plot_simple.png')
  plt.show()
  plt.close()


def cohen_kappa_metric(model_output, expected_output):
  '''calculate cohen kappa metric between model's output and expected output 
  (golden standard defined in MURA dataset paper) 
  
  Parameters
  ----------
  model_output : List
    a list of zeroes and ones as predicted results from model (zero for negative)
  expected_output : List
    a list of zeroes and ones as reported results for testing data

  Returns
  -------
  Float
    Cohen kappa metric score value
  '''
  from sklearn.metrics import cohen_kappa_score

  if len(model_output) != len(expected_output):
    print("Input strings for Kappa metrics don't have the same size.\
    Can't be compared")
    return 0.0
  else:
    return cohen_kappa_score(model_output, expected_output)


def percentage(all_imgs, correct_imgs):
  '''Calculate percentage of correct images'''
  return 100 * float(correct_imgs)/float(all_imgs)


def predict(model):
  '''Use model to predict whether imgs are positive or negative

  Parameters
  ----------
  model : tensorflow.python.keras.engine.sequential.Sequential
    model used for prediction

  Returns
  -------
  List
    A list list of zeroes and ones as ground_truth values (one for positive and
    zero for negative result)
  List
    A list list of zeroes and ones as prediction values (one for positive and
    zero for negative result)
  '''
  ground_truth = []
  predictions = []

  test_batches = ImageDataGenerator(rescale=1. / 255).flow_from_directory(
    # path to target directory from which data will be loaded
    PATH_VALID,
    target_size=IMG_SIZE,
    batch_size=1,
    class_mode='binary',
    shuffle = False,
  )

  for i in range(len(test_batches.filenames)):
    image, label = test_batches.next()

    prediction = model.predict(image)
    if prediction > 0.5:
      predictions.append(1)
    else:
      predictions.append(0)

    ground_truth.append(label.flat[0])
  
  return ground_truth, predictions


def get_acc_kappa(PATH_TO_MODEL):
  '''Call required functions to calculate kappa and accuracy

  Returns
  -------
  Float
    accuracy value
  Float
    kappa value
  '''
  model = load_keras_model(PATH_TO_MODEL)
  ground_truth, predictions = predict(model)

  new_gt = []
  for i in ground_truth:
    new_gt.append(int(i))

  correct_cnt = 0
  for i in range(len(new_gt)):
    if predictions[i] == new_gt[i]:
      correct_cnt += 1

  kappa = cohen_kappa_metric(new_gt, predictions)
  accuracy = percentage(len(new_gt), correct_cnt)

  print("Test images:", len(new_gt))
  print("Correctly predicted:", correct_cnt)
  print("Accuracy:", accuracy)
  print("Kappa   :", kappa)

  return accuracy, kappa

In [None]:
# display batch if training examples and validation examples
show_sample_images(train_batches)
show_sample_images(valid_batches)

## Model definition

In [None]:
'''
Activation function is method that define the output of neuron, different
functions use distinctive methods of how to figure out output.
I am using ReLU - Rectified linear unit. It is simple but effective function,
x = max(0,x)'''
ACTIVATION_FUNCTION='relu'

'''Kernel initializer define method used in setting the initial random weights
in layers. 
Currently using he_uniform, draws samples from a uniform distribution within
[-limit, limit] where limit is sqrt(6 / fan_in) where fan_in
is the number of input units in the weight ten'''
KERNER_INITIALIZER=he_uniform(0)

'''
Padding is a method to extend input image, so filter kernel can work as intended
even around borders.
Same - is method that guarantee output have SAME spatial dimensions as input'''
PADDING='same'

'''Optimizer is algorithm that change the attributes of neural network
 (weights and learning rate) in order to reduce loss value
 Stochastic gradient descent, which is improved by momentum method,
 lr is learning rate - it determines how big changes can be made in each step
 during training model, bigger lr means faster learning, but step too big can
 miss the best solution, thats why i can't be way to big'''
OPTIMIZER=SGD(lr=0.01, momentum=0.9, decay=0.01/NUMBER_EPOCHS)

'''Loss function evaluate loss/cost of specific event, in this case 
loss function describe how well does model perform by comparing ground truth(
label of what class is the training input image) with output from model'''
LOSS_FUNCTION='binary_crossentropy'

def three_block_VGG():
  """Three Block VGG Model"""
  model = Sequential()

  model.add(Conv2D(32, (3, 3), activation=ACTIVATION_FUNCTION, kernel_initializer=KERNER_INITIALIZER, padding=PADDING, input_shape=INPUT_SHAPE))
  model.add(MaxPooling2D((2, 2)))
  model.add(Dropout(0.1))

  model.add(Conv2D(64, (3, 3), activation=ACTIVATION_FUNCTION, kernel_initializer=KERNER_INITIALIZER, padding=PADDING))
  model.add(MaxPooling2D((2, 2)))
  model.add(Dropout(0.1))

  model.add(Conv2D(128, (3, 3), activation=ACTIVATION_FUNCTION, kernel_initializer=KERNER_INITIALIZER, padding=PADDING))
  model.add(MaxPooling2D((2, 2)))
  model.add(Dropout(0.1))

  model.add(Conv2D(256, (3, 3), activation=ACTIVATION_FUNCTION, kernel_initializer=KERNER_INITIALIZER, padding=PADDING))
  model.add(MaxPooling2D((2, 2)))
  model.add(Flatten())
  
  model.add(Dense(256, activation=ACTIVATION_FUNCTION, kernel_initializer=KERNER_INITIALIZER))
  model.add(Dense(1, activation='sigmoid'))
  # compile model
  model.compile(optimizer=OPTIMIZER, loss=LOSS_FUNCTION, metrics=['accuracy'])
  return model

## Training model

In [None]:
# initialize model
model = three_block_VGG()

# print summary info about model and its layers
model.summary()

In [None]:
# saves the best model
model_checkpoint = ModelCheckpoint(filepath=PATH+"best_model_simple.h5", monitor='val_accuracy', mode='max', save_best_only=True, verbose=1)

# training model
history = model.fit(train_batches,
                    steps_per_epoch= NUMBER_STEPS_PER_EPOCH,
                    validation_data = valid_batches,
                    validation_steps = NUMBER_VALIDATION_STEPS,
                    epochs = NUMBER_EPOCHS,
                    callbacks=[model_checkpoint]
                    )


In [None]:
# show plot of accuracy and loss (both training and validation) during training
plot_training(history) 

In [None]:
# calculate and display kappa and accuracy of the model
get_acc_kappa(PATH+"best_model_simple.h5")