In [None]:
#@title Install and Import Dependencies
%pip install einops
%pip install tensorflowjs
import numpy as np
from PIL import Image
import tensorflowjs as tfjs

In [None]:
#@title ViT 
import tensorflow as tf
from keras import Model
from keras.layers import Layer
from keras import Sequential
import keras.layers as nn

from tensorflow import einsum
from einops import rearrange, repeat
from einops.layers.tensorflow import Rearrange

def pair(t):
    return t if isinstance(t, tuple) else (t, t)

class PreNorm(Layer):
    def __init__(self, fn):
        super(PreNorm, self).__init__()

        self.norm = nn.LayerNormalization()
        self.fn = fn

    def call(self, x, training=True):
        return self.fn(self.norm(x), training=training)

class MLP(Layer):
    def __init__(self, dim, hidden_dim, dropout=0.0):
        super(MLP, self).__init__()

        def GELU():
            def gelu(x, approximate=False):
                if approximate:
                    coeff = tf.cast(0.044715, x.dtype)
                    return 0.5 * x * (1.0 + tf.tanh(0.7978845608028654 * (x + coeff * tf.pow(x, 3))))
                else:
                    return 0.5 * x * (1.0 + tf.math.erf(x / tf.cast(1.4142135623730951, x.dtype)))

            return nn.Activation(gelu)

        self.net = Sequential([
            nn.Dense(units=hidden_dim),
            GELU(),
            nn.Dropout(rate=dropout),
            nn.Dense(units=dim),
            nn.Dropout(rate=dropout)
        ])

    def call(self, x, training=True):
        return self.net(x, training=training)

class Attention(Layer):
    def __init__(self, dim, heads=8, dim_head=64, dropout=0.0):
        super(Attention, self).__init__()
        inner_dim = dim_head * heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = nn.Softmax()
        self.to_qkv = nn.Dense(units=inner_dim * 3, use_bias=False)

        if project_out:
            self.to_out = [
                nn.Dense(units=dim),
                nn.Dropout(rate=dropout)
            ]
        else:
            self.to_out = []

        self.to_out = Sequential(self.to_out)

    def call(self, x, training=True):
        qkv = self.to_qkv(x)
        qkv = tf.split(qkv, num_or_size_splits=3, axis=-1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=self.heads), qkv)

        # dots = tf.matmul(q, tf.transpose(k, perm=[0, 1, 3, 2])) * self.scale
        dots = einsum('b h i d, b h j d -> b h i j', q, k) * self.scale
        attn = self.attend(dots)

        # x = tf.matmul(attn, v)
        x = einsum('b h i j, b h j d -> b h i d', attn, v)
        x = rearrange(x, 'b h n d -> b n (h d)')
        x = self.to_out(x, training=training)

        return x

class Transformer(Layer):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout=0.0):
        super(Transformer, self).__init__()

        self.layers = []

        for _ in range(depth):
            self.layers.append([
                PreNorm(Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout)),
                PreNorm(MLP(dim, mlp_dim, dropout=dropout))
            ])

    def call(self, x, training=True):
        for attn, mlp in self.layers:
            x = attn(x, training=training) + x
            x = mlp(x, training=training) + x

        return x

class ViT(Model):
    def __init__(self, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim,
                 pool='cls', dim_head=64, dropout=0.0, emb_dropout=0.0):
        """
            image_size: int.
            -> Image size. If you have rectangular images, make sure your image size is the maximum of the width and height
            patch_size: int.
            -> Number of patches. image_size must be divisible by patch_size.
            -> The number of patches is: n = (image_size // patch_size) ** 2 and n must be greater than 16.
            num_classes: int.
            -> Number of classes to classify.
            dim: int.
            -> Last dimension of output tensor after linear transformation nn.Linear(..., dim).
            depth: int.
            -> Number of Transformer blocks.
            heads: int.
            -> Number of heads in Multi-head Attention layer.
            mlp_dim: int.
            -> Dimension of the MLP (FeedForward) layer.
            dropout: float between [0, 1], default 0..
            -> Dropout rate.
            emb_dropout: float between [0, 1], default 0.
            -> Embedding dropout rate.
            pool: string, either cls token pooling or mean pooling
        """
        super(ViT, self).__init__()

        image_height, image_width = pair(image_size)
        patch_height, patch_width = pair(patch_size)

        assert image_height % patch_height == 0 and image_width % patch_width == 0, 'Image dimensions must be divisible by the patch size.'

        num_patches = (image_height // patch_height) * (image_width // patch_width)
        assert pool in {'cls', 'mean'}, 'pool type must be either cls (cls token) or mean (mean pooling)'

        self.patch_embedding = Sequential([
            Rearrange('b (h p1) (w p2) c -> b (h w) (p1 p2 c)', p1=patch_height, p2=patch_width),
            nn.Dense(units=dim)
        ], name='patch_embedding')

        self.pos_embedding = tf.Variable(initial_value=tf.random.normal([1, num_patches + 1, dim]))
        self.cls_token = tf.Variable(initial_value=tf.random.normal([1, 1, dim]))
        self.dropout = nn.Dropout(rate=emb_dropout)

        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim, dropout)

        self.pool = pool

        self.mlp_head = Sequential([
            nn.LayerNormalization(),
            nn.Dense(units=num_classes)
        ], name='mlp_head')

    def call(self, img, training=True, **kwargs):
        x = self.patch_embedding(img)
        b, n, d = x.shape

        cls_tokens = repeat(self.cls_token, '() n d -> b n d', b=b)
        x = tf.concat([cls_tokens, x], axis=1)
        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x, training=training)

        x = self.transformer(x, training=training)

        if self.pool == 'mean':
            x = tf.reduce_mean(x, axis=1)
        else:
            x = x[:, 0]

        x = self.mlp_head(x)

        return x

""" Usage

v = ViT(
    image_size = 256,
    patch_size = 32,
    num_classes = 1000,
    dim = 1024,
    depth = 6,
    heads = 16,
    mlp_dim = 2048,
    dropout = 0.1,
    emb_dropout = 0.1
)

img = tf.random.normal(shape=[1, 256, 256, 3])
preds = v(img) # (1, 1000)

"""

In [3]:
#@title Training Loop
import tensorflow as tf
import tensorflow_datasets as tfds
import keras
from keras import layers
from matplotlib import pyplot as plt

from datetime import datetime
import os

def plot_data(loss_history, accuracy_history, validation_accuracy_history, save_figs=False):
    fig, axs = plt.subplots(2)
    fig.suptitle('Training Data')

    axs[0].plot(accuracy_history, label='training accuracy')
    axs[0].plot(validation_accuracy_history, label='validation accuracy')
    axs[0].legend()


    axs[1].plot(loss_history, label='loss')
    axs[1].legend()

    if save_figs:
      axs[0].save_fig('/content/drive/MyDrive/savedModels/CNNCovidClassifier/graphs/training_val_accuracy.png')
      axs[1].save_fig('/content/drive/MyDrive/savedModels/CNNCovidClassifier/graphs/training_loss.png')


loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

def calc_loss(model, x, y):
        y_ = model(x)
        loss = loss_fn(y, y_)
        return loss

def calc_grad(model, x, y):
    with tf.GradientTape() as tape:
        loss = calc_loss(model, x, y)
    return tape.gradient(loss, model.trainable_variables), loss

def train_model(model, n_epochs, train_ds, batch_size, learning_rate=3e-5, with_plot=True, validation_ds=None, 
                update_increment=10, model_name=None, checkpoint_dir='savedModels', save_figs=False):
    
    ds_train_batch = train_ds.batch(batch_size)

    try: 
        ds_test_batch = validation_ds.batch(len(validation_ds)-1)
        print('Beggining Training with Validation')
        with_validation = True
    except: 
        with_validation = False
        print('Beggining Training without Validation')


    decay_steps = 1000
    lr_decayed_fn = tf.keras.optimizers.schedules.CosineDecay(
        learning_rate, decay_steps)
    optimizer = tf.keras.optimizers.Adam(lr_decayed_fn)

    loss_history, accuracy_history, validation_accuracy_history = [], [], []

    start_time = datetime.now()
    for epoch in range(n_epochs):
        epoch_loss = tf.keras.metrics.Mean()
        epoch_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
        validation_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

        for x, y in ds_train_batch:
            gradient, loss = calc_grad(model, x, y)
            optimizer.apply_gradients(zip(gradient, model.trainable_variables))

            epoch_loss.update_state(loss)  
            epoch_accuracy.update_state(y, model(x))

        loss_history.append(epoch_loss.result())
        accuracy_history.append(epoch_accuracy.result())
   

        if with_validation: 
            for x, y in ds_test_batch:
                validation_accuracy.update_state(y, model(x))

            val_acc = validation_accuracy.result()
            validation_accuracy_history.append(val_acc)
            validation_accuracy.reset_state()


        if epoch % update_increment == 0:
            elapsed = datetime.now() - start_time
            print('Epoch: %i' %epoch, 'Validation Accuracy: %.4f' %val_acc, 'Training Accuracy: %.4f' %epoch_accuracy.result(), 'Loss: %.6f' %epoch_loss.result(), 'Time: ' + datetime.now().strftime("%H:%M:%S"))
            
            start_time = datetime.now()


        
    if model_name!=None:
        try:
            model.save(os.path.join(checkpoint_dir, model_name))
            print('Model Weights Saved')
        except:
            if os.path.exists(os.path.join(checkpoint_dir, model_name)):
                print('Error: model not saved')
            else: print('Model not saved. Ensure checkpoint_path is valid.')

    if with_plot:
        if with_validation: plot_data(loss_history, accuracy_history, validation_accuracy_history, save_figs)
        else:               plot_data(loss_history, accuracy_history, save_figs)



In [4]:
#@title Create Model
# model = tf.keras.models.Sequential()
# model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (3, 3), activation='relu'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (3, 3), activation='relu'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (3, 3), activation='relu'))
# model.add(layers.Flatten())
# model.add(layers.Dense(64, activation='relu'))
# model.add(layers.Dense(2))



In [5]:
#@title Create ViT
model = ViT(
    image_size = 224,
    patch_size = 32,
    num_classes = 2,
    dim = 1024,
    depth = 6,
    heads = 16,
    mlp_dim = 2048,
    dropout = 0.1,
    emb_dropout = 0.1
)


In [6]:
#@title create_file_list()
def create_file_list(non_covid_file_name_path, covid_file_name_path, training_file, labels_file, from_drive=False, drive_start='/content/drive/MyDrive'):
    non_covid_image_folder_path = 'COVID-CT/Images-processed/CT_NonCOVID'
    covid_image_folder_path = 'COVID-CT/Images-processed/CT_COVID'
    
    if from_drive:
      non_covid_file_name_path = os.path.join(drive_start, non_covid_file_name_path)
      covid_file_name_path = os.path.join(drive_start, covid_file_name_path)
      training_file = os.path.join(drive_start, training_file)
      labels_file = os.path.join(drive_start, labels_file)
      non_covid_image_folder_path = os.path.join(drive_start, non_covid_image_folder_path)
      covid_image_folder_path = os.path.join(drive_start, covid_image_folder_path)

    non_covid_file_name_list = open(non_covid_file_name_path).readlines()
    covid_file_name_list = open(covid_file_name_path).readlines()

    f = open(training_file, 'w')
    g = open(labels_file, 'w')

    for non_covid_file_name, covid_file_name in zip(non_covid_file_name_list, covid_file_name_list):
        
        non_covid_file_name = non_covid_file_name.strip()
        covid_file_name = covid_file_name.strip()

        non_covid_path = os.path.join(non_covid_image_folder_path, non_covid_file_name)
        covid_path = os.path.join(covid_image_folder_path, covid_file_name)
        f.write(non_covid_path + '\n')
        g.write('0\n')
        f.write(covid_path + '\n')
        g.write('1\n')

    f.close()
    g.close()

In [7]:
#@title create_data_from_list()
def create_data_from_list(data_folder, from_drive=False, drive_start='/content/drive/MyDrive'):
    if from_drive:
      data_folder = os.path.join(drive_start, data_folder)
      
    data_list = os.path.join(data_folder, 'data.txt')
    label_list = os.path.join(data_folder, 'labels.txt')

    training_data_list = open(data_list).readlines()
    training_label_list = open(label_list).readlines()

    training_tensors_data = []
    for i, file in enumerate(training_data_list):
        training_data_list[i] = file.strip()
        training_tensors_data.append(process_path(file.strip()))

    for i, label in enumerate(training_label_list):
        training_label_list[i] = int(label.strip())
    tensor_labels = tf.convert_to_tensor(training_label_list)

    IMG_SIZE = 224
    data_augmentation = tf.keras.Sequential([
        layers.Resizing(IMG_SIZE, IMG_SIZE, 'bilinear'),
        layers.Rescaling(1./255),
        layers.RandomFlip("horizontal_and_vertical"),
    ])

    for i, tensor_data in enumerate(training_tensors_data):
        training_tensors_data[i] = data_augmentation(tensor_data)

    training_tensors_data = tf.convert_to_tensor(training_tensors_data)
    dataset = tf.data.Dataset.from_tensor_slices((training_tensors_data, tensor_labels))
    
    return dataset

def process_path(file_path):
  img = Image.open(file_path)
  if np.shape(img)[-1] != 3:
      try: 
          img = img.convert('RGB')
      except:
          print('Invalid Data Removed: ')
          print(np.shape(img))
          return '', False

  img = np.asarray(img)
  return tf.convert_to_tensor(img)

In [8]:
#@title Create data.txt and labels.txt
non_covid_training_file_name_path = 'COVID-CT/Data-split/NonCOVID/trainCT_NonCOVID.txt'
covid_training_file_name_path = 'COVID-CT/Data-split/COVID/trainCT_COVID.txt'
training_data_path = 'COVID-CT/Data-split/Training/data.txt'
training_label_path = 'COVID-CT/Data-split/Training/labels.txt'

non_covid_validation_file_name_path = 'COVID-CT/Data-split/NonCOVID/valCT_NonCOVID.txt'
covid_validation_file_name_path = 'COVID-CT/Data-split/COVID/valCT_COVID.txt'
validation_data_path = 'COVID-CT/Data-split/Validation/data.txt'
validation_label_path = 'COVID-CT/Data-split/Validation/labels.txt'

train_ds = create_file_list(non_covid_training_file_name_path, covid_training_file_name_path, training_data_path, training_label_path, from_drive=True)
validation_ds = create_file_list(non_covid_validation_file_name_path, covid_validation_file_name_path, validation_data_path, validation_label_path, from_drive=True)


In [9]:
#@title Create dataset from data.txt and labels.txt
training_info_folder = 'COVID-CT/Data-split/Training'
validation_info_folder = 'COVID-CT/Data-split/Validation'
train_ds = create_data_from_list(training_info_folder, from_drive=True)
validation_ds = create_data_from_list(validation_info_folder, from_drive=True)

In [None]:
train_model(model, n_epochs=50, train_ds=train_ds, validation_ds=validation_ds, learning_rate= 0.0001, batch_size=16, update_increment=10, save_figs=True)

In [None]:
model.save('/content/drive/MyDrive/savedModels/CNNCovidClassifier/tfModel')
tfjs.converters.save_keras_model(model, '/content/drive/MyDrive/savedModels/CNNCovidClassifier/tfjsModel')
