Initialise code for google colab

Mount google drive

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


In [None]:
import tensorflow as tf
print(tf.__version__)
import tensorflow_addons as tfa
print(tfa.__version__)

In [None]:
!pip install tensorflow==2.11
!pip install tensorflow-addons==0.19

Create data base files under google colab environment

In [None]:
!unzip -q '/content/drive/MyDrive/data_equalize.zip' -d '/content/'

For Luc because my archive made on mac create a __MACOSX folder

In [None]:
%rm -rf /content/__MACOSX

Define working directory to our jupyter repertory:
* because path to the different repertories (./data, ./output...) are define relatevly to jupyter one
* let import _mypath which add ./lib to python path in order to import our own define libraries


In [None]:
%cd /content/drive/MyDrive/covid-19-xRay/jupyter


In [None]:
# for google colab use
db_work_dir = '/content'

In [None]:
# for local use
db_work_dir = '..'

In [None]:
import _mypath
import os
import shutil
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

%load_ext autoreload
%autoreload 1



In [None]:
from database.path_origin_data import build_data_paths 
from database.path_origin_data import lung_name, infection_name
from database.path_origin_data import train_name, test_name, valid_name
from database.path_origin_data import normal_name, covid_name, no_covid_name
from database.path_origin_data import images_name, lung_mask_name, infection_mask_name

from database.dataset import build_dataset

Build paths and variables for reading data base hierarchy

In [None]:
db_name = 'data_equalize'
db_path = os.path.join(db_work_dir, db_name)

output_path = os.path.join('..', 'output')
if not os.path.exists(output_path):
    os.makedirs(output_path, exist_ok=True)

Structure to manage paths in data base

In [None]:
data_paths = build_data_paths()
idx = pd.IndexSlice

Create tf Dataset

In [None]:

paths = data_paths['path']

ds_train = build_dataset(db_path, paths, db=[lung_name], ds=[train_name])
ds_test = build_dataset(db_path, paths, db=[lung_name], ds=[test_name])
ds_valid = build_dataset(db_path, paths, db=[lung_name], ds=[valid_name])
ds_train


Model parameters

In [None]:
batch_size = 64
image_size = 256
input_shape = (image_size, image_size, 1)

learning_rate = 0.001
weight_decay = 0.0001
num_epochs = 100

label_smoothing = 0.01
lam_recon = 10.

# data augmentation
scale = 1. / 255.
flip = "horizontal"
rotation_factor = 0.1
zoom_height_factor = 0.2
zoom_width_factor = 0.2

# vit
patch_size = 1
transformer_layers = 2
num_heads = 4
projection_dim = 64
transformer_units_rate = [2, 1]
mlp_head_units = [512, 256]  # Size of the dense layers of the final classifier


Data augmentation

In [None]:
augmentation = keras.Sequential(
    [
      layers.Rescaling(scale=scale),
      layers.RandomFlip(flip),
      layers.RandomRotation(rotation_factor),
      layers.RandomZoom(height_factor=zoom_height_factor, width_factor=zoom_width_factor),
    ],
    name='augmentation'
)

cnn encoder

In [None]:
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Dropout

# Initialising the CNN
# encoder = keras.Sequential(
    [
      layers.Conv2D(128, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(128, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(64, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(64, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(32, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(32, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Flatten(),
    ],
    name='encoder'
)

encoder = keras.Sequential(
    [
      layers.Conv2D(128, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(128, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(128, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(64, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(64, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.MaxPooling2D(pool_size = (2, 2)),
      layers.Conv2D(64, (3, 3), activation = 'relu', padding='same', kernel_initializer='random_normal'),
      layers.Flatten(),
    ],
    name='encoder'
)

decoder

In [None]:
decoder = keras.Sequential(
    [
      layers.Dense(512, activation='relu'),
      layers.Dense(np.prod(input_shape), activation='sigmoid'),
      layers.Reshape(target_shape=input_shape),
    ],
    name='decoder'
)


In [None]:
classifier = keras.Sequential(
    [
      layers.Dense(512, activation='relu'),
      layers.Dense(256, activation='relu'),
      layers.Dense(3, activation='softmax'),
    ],
    name='classifier'
)


In [None]:
inputs = layers.Input(shape=input_shape)

class MyModel(tf.keras.Model):

  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.augmentation = augmentation
    self.encoder = encoder
    self.decoder = decoder
    self.classifier = classifier

  def call(self, inputs):
    x = self.augmentation(inputs)
    x = self.encoder(x)
    self.decoder(x)
    return self.classifier(x)

classif_decoder = MyModel(name='classif_decoder')
classified = classif_decoder(inputs)

model = keras.Model(inputs=inputs, outputs=classified)



In [None]:
model.get_layer('classif_decoder').summary()


In [None]:
%aimport run_exp.classif_autoencoder
from run_exp.classif_autoencoder import run_experiment as run_experiment_autoencoder

classif_decoder = model.get_layer('classif_decoder')
encoder_input = classif_decoder.get_layer('augmentation')
decoder_output = classif_decoder.get_layer('decoder')

cnn_history, cnn_conf_mat = run_experiment_autoencoder(model, encoder_input, decoder_output,
              ds_train, ds_test, ds_valid,
              batch_size=batch_size, num_epochs=num_epochs,
              learning_rate=learning_rate, weight_decay=weight_decay,
              lam_recon=lam_recon,
              output_path=output_path, prefix='cnn',
              from_logits=False, label_smoothing=label_smoothing)

In [None]:
import joblib

file_name = os.path.join(output_path, 'cnn_conf_mat.joblib')
joblib.dump(cnn_conf_mat, file_name)

cnn_conf_mat

In [None]:
ckpt_path = os.path.join(output_path, 'ckpt')
checkpoint_filename = os.path.join(ckpt_path, 'cnn_weights.hdf5')
model.load_weights(checkpoint_filename)

# model.get_layer('classif_decoder').summary()


In [None]:
# encoder = model.get_layer('classif_decoder').get_layer('encoder')
# encoder.summary()


In [None]:
# layer_names = [layer.name for layer in encoder.layers]
# layer_idx = layer_names.index('max_pooling2d_8')

# Sub Model
shared_encoder = tf.keras.Sequential(name='shared_encoder')

for layer in encoder.layers[:-1]:
  shared_encoder.add(layer)


In [None]:

%aimport myLayers.vision_transformer
%aimport myLayers.mlp
from myLayers.vision_transformer import add_vit
from myLayers.mlp import mlp

inputs = layers.Input(shape=input_shape)
augmented_2 = augmentation(inputs)
shared_model = shared_encoder(augmented_2)
features = add_vit(shared_model,
            patch_size=patch_size,
            input_image_size=shared_model.shape[1],
            transformer_layers=transformer_layers,
            num_heads=num_heads,
            projection_dim=projection_dim,
            transformer_units_rate=transformer_units_rate,
            mlp_head_units=mlp_head_units)
# Classify outputs.
softmax = layers.Dense(3, activation='softmax', kernel_initializer='random_normal')(features)

# Create the Keras model.
model_transformer = keras.Model(inputs=inputs, outputs=softmax)


In [None]:
%aimport run_exp.standard
from run_exp.standard import run_experiment as run_experiment_transformer

transformer_history, transformer_conf_mat = run_experiment_transformer(model,
              ds_train, ds_test, ds_valid,
              batch_size=batch_size, num_epochs=num_epochs,
              learning_rate=learning_rate, weight_decay=weight_decay,
              output_path=output_path, prefix='transformer',
              from_logits=False, label_smoothing=label_smoothing)

In [None]:
import joblib

file_name = os.path.join(output_path, 'transformer_conf_mat.joblib')
joblib.dump(transformer_conf_mat, file_name)

transformer_conf_mat