
# **CT2CT: Resolution Augmentation of Computed Tomography (CT) Scans** 
*by Sergio Rivera*
Please, follow the execution of the cells listed below to understand and use the model.
---


# **1. LOAD THE MODEL**
In this first step data from a default scan will be fetched, generator and discriminator will be defined and compiled, I/O images will be generated and all the required parameters will be set ready for training.

In [0]:
%tensorflow_version 2.x
import tensorflow as tf
from tensorflow.keras import *
from tensorflow.keras.layers import *

import numpy as np
import random

import os
import shutil

from PIL import Image
from PIL import ImageChops
import matplotlib.pyplot as plt

from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen

# GPU Check - only for Colab env
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at {}'.format(device_name))

# CONNECT TO DRIVE
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# PATHS
PATH = "/content/drive/My Drive/brain_ct"
ATTEMPT_PATH = PATH + "/attempts"
SAVED_MODELS_PATH = PATH + "/savedModels"

# DELETE PREVIOUS CONTENT - only delete attempts folder in the future
shutil.rmtree(ATTEMPT_PATH)
os.makedirs(ATTEMPT_PATH)

# GLOBAL VARIABLES
RANDOM_SEED = 1234
IMG_WIDTH = 512
IMG_HEIGHT = 512

# FETCH DATA
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen
import natsort

print("Downloading scan...")
resp = urlopen("https://raw.github.com/sergiorivera50/CT2CT/master/rawIMG.zip")
zip_file = ZipFile(BytesIO(resp.read()))
file_names = zip_file.namelist()
file_names = natsort.natsorted(file_names,reverse=False) # Sort filenames (1 -> 240)
RAW_IMAGES = []

for i in range(len(file_names)):
  content = zip_file.read(file_names[i]) # Reads content
  img = Image.open(BytesIO(content)) # Converts to image
  RAW_IMAGES.append(img) # Stores image into RAW_IMAGES array

slices = len(RAW_IMAGES)

# GENERATE INPUT/OUTPUT
import cv2

def rgbImg2GreyImg(rgb_img):
  return cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)

print("Generating I/O...")

INPUT_IMAGES = np.empty((slices-2, IMG_HEIGHT, IMG_WIDTH, 1), dtype="float32")
OUTPUT_IMAGES = np.empty((slices-2, IMG_HEIGHT, IMG_WIDTH, 1), dtype="float32")

i = 0
while i < (slices-2):
    A = RAW_IMAGES[i] # Image 1
    C = RAW_IMAGES[i+2] # Image 3
    B = RAW_IMAGES[i+1] # Image 2
    
    # Save middle image (B)(OUT)
    OUTPUT_IMAGES[i] = np.resize(rgbImg2GreyImg(np.float32(B)), (IMG_HEIGHT, IMG_WIDTH, 1))
    
    # Combine and save AmC
    combined = cv2.addWeighted(np.float32(A), 0.5, np.float32(C), 0.5, 0)
    INPUT_IMAGES[i] = np.resize(rgbImg2GreyImg(combined), (IMG_HEIGHT, IMG_WIDTH, 1))
    i += 1

# Normalize images
def normalize(inimg, tgimg):
    inimg = (inimg / 128) - 1
    tgimg = (tgimg / 128) - 1

    return inimg, tgimg

print("Preprocessing images...")
for i in range(slices-2):
  INPUT_IMAGES[i], OUTPUT_IMAGES[i] = normalize(INPUT_IMAGES[i], OUTPUT_IMAGES[i])

print("Shuffling and splitting train/test...")
# CODE CITED FROM: https://stackoverflow.com/questions/4601373/better-way-to-shuffle-two-numpy-arrays-in-unison
def shuffle_in_unison(a, b):
    assert len(a) == len(b)
    shuffled_a = np.empty(a.shape, dtype=a.dtype)
    shuffled_b = np.empty(b.shape, dtype=b.dtype)
    permutation = np.random.permutation(len(a))
    for old_index, new_index in enumerate(permutation):
        shuffled_a[new_index] = a[old_index]
        shuffled_b[new_index] = b[old_index]
    return shuffled_a, shuffled_b
# END CITE

# TRAIN/TEST

train_n = round(slices * 0.80) # Train/Test 0.8/0.2

np.random.seed(RANDOM_SEED)
RAND_INPUT_IMAGES, RAND_OUTPUT_IMAGES = shuffle_in_unison(INPUT_IMAGES, OUTPUT_IMAGES)

tr_input_imgs = RAND_INPUT_IMAGES[:train_n]
tr_output_imgs = RAND_OUTPUT_IMAGES[:train_n]

ts_input_imgs = RAND_INPUT_IMAGES[train_n:slices]
ts_output_imgs = RAND_OUTPUT_IMAGES[train_n:slices]

print("slices= {} | train= {} | test= {}".format(slices, len(tr_input_imgs), len(ts_input_imgs)))

# CREATE DATASETS
train_dataset = tf.data.Dataset.from_tensor_slices((tr_input_imgs, tr_output_imgs))
train_dataset = train_dataset.batch(1)

test_dataset = tf.data.Dataset.from_tensor_slices((ts_input_imgs, ts_output_imgs))
test_dataset = test_dataset.batch(1)

# ENCODER
# C64-C128-C256-C512-C512-C512-C512-C512
def downsample(filters, apply_batchnorm=True):
  
    result = Sequential()

    initializer = tf.random_normal_initializer(0, 0.02) # mean 0 deviation 0.02

    # Convolutional layer
    result.add(Conv2D(filters,
                    kernel_size=4,
                    strides=2, # Each layer info shrinks by 1 / strides
                    padding="same",
                    kernel_initializer=initializer,
                    use_bias=not apply_batchnorm)) # Only apply if not using BatchNorm

    if apply_batchnorm:
        # BatchNorm layer
        result.add(BatchNormalization())

    # Activation layer
    result.add(LeakyReLU())

    return result

# DECODER
# CD512-CD512-CD512-C512-C256-C128-C64
def upsample(filters, apply_dropout=False):
  
    result = Sequential()

    initializer = tf.random_normal_initializer(0, 0.02) # mean 0 deviation 0.02

    # Convolutional layer
    result.add(Conv2DTranspose(filters, #Conv2D inverse
                             kernel_size=4,
                             strides=2,
                             padding="same",
                             kernel_initializer=initializer,
                             use_bias=False))

    # BatchNorm layer
    result.add(BatchNormalization())

    if apply_dropout:
        # Dropout layer (disconnects random connections, regularized)
        result.add(Dropout(0.5))

    # Activation layer
    result.add(ReLU())

    return result

# GENERATOR
def Generator():

    inputs = tf.keras.layers.Input(shape=[None, None, 1]) # Not specified dimensions, 1 channel

    # C64-C128-C256-C512-C512-C512-C512-C512
    down_stack = [                        # (batch_size, width, height, filters)
      downsample(64, apply_batchnorm=False), # (bs, 128, 128, 64)
      downsample(128),                       # (bs, 64, 64, 128)
      downsample(256),                       # (bs, 32, 32, 256)
      downsample(512),                       # (bs, 16, 16, 512)
      downsample(512),                       # (bs, 8, 8, 512)
      downsample(512),                       # (bs, 4, 4, 512)
      downsample(512),                       # (bs, 2, 2, 512)
      downsample(512),                       # (bs, 1, 1, 512)
    ]

    # CD512-CD512-CD512-C512-C256-C128-C64
    up_stack = [
      upsample(512, apply_dropout=True),     # (bs, 2, 2, 1024)
      upsample(512, apply_dropout=True),     # (bs, 4, 4, 1024)
      upsample(512, apply_dropout=True),     # (bs, 8, 8, 1024)
      upsample(512),                         # (bs, 16, 16, 1024)
      upsample(256),                         # (bs, 32, 32, 512)
      upsample(128),                         # (bs, 64, 64, 256)
      upsample(64),                          # (bs, 128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0, 0.02)

    # Last layer = generated image
    last = Conv2DTranspose(filters=1, # filters = 1 made discriminator layers to be [None, None, None, 2], fixing all 1 channel bugs
                         kernel_size=4,
                         strides=2,
                         padding="same",
                         kernel_initializer=initializer,
                         activation="tanh" # de -1 a 1
                         )

    x = inputs

    s = [] # Skip Connections list

    concat = Concatenate()

    # Connect layers
    for down in down_stack:
        x = down(x)
        s.append(x)

    s = reversed(s[:-1])

    for up, skip in zip(up_stack, s):

        x = up(x)
        x = concat([x, skip])

    last = last(x)

    return Model(inputs=inputs, outputs=last) # Builds model

generator = Generator()

# DISCRIMINATOR 70x70
# C64-C128-C256-C512
def Discriminator():
  
    # (dim1, dim2, channels) by default from Keras
    ini = Input(shape=[None, None, 1], name="input_img")
    gen = Input(shape=[None, None, 1], name="gener_img")
    
    con = concatenate([ini, gen])

    initializer = tf.random_normal_initializer(0, 0.2)

    down1 = downsample(64, apply_batchnorm=False)(con)
    down2 = downsample(128)(down1)
    down3 = downsample(256)(down2)
    down4 = downsample(512)(down3)
  
    last = tf.keras.layers.Conv2D(filters=1, # Pixel by pixel
                                kernel_size=4,
                                strides=1,
                                kernel_initializer=initializer,
                                padding="same")(down4)

    return tf.keras.Model(inputs=[ini, gen], outputs=last)

discriminator = Discriminator()

# OPTIMIZERS
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

# LOSS CALCULATIONS
current_gen_loss = 1
current_discr_loss = 1
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True) # Apply logit function

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss

    return total_disc_loss

LAMBDA = 100
def generator_loss(disc_generated_output, gen_output, target):
  
    #gan_loss = adversary error
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

    #mean absolute error, between generated and target
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

    total_gen_loss = gan_loss + (LAMBDA * l1_loss) #mas peso a l1_loss que al adversario

    return total_gen_loss

# IMAGE GENERATION
def generate_images_test(model, test_input, tar, save_filename=False):
  
    prediction = model(test_input, training=False)

    if save_filename:
        tf.keras.preprocessing.image.save_img(ATTEMPT_PATH + "/" + save_filename + ".jpg", prediction[0,...])

# TRAIN STEP
@tf.function()
def train_step(input_image, target):
  # Gradients & Backpropagation
  with tf.GradientTape() as gen_tape, tf.GradientTape() as discr_tape:

      output_image = generator(input_image, training=True)

      output_gen_discr = discriminator([output_image, input_image], training=True)

      output_trg_discr = discriminator([target, input_image], training=True)

      discr_loss = discriminator_loss(output_trg_discr, output_gen_discr)
      current_discr_loss = discr_loss

      gen_loss = generator_loss(output_gen_discr, output_image, target)
      current_gen_loss = gen_loss

      # Apply gradients

      generator_grads = gen_tape.gradient(gen_loss, generator.trainable_variables)

      discriminator_grads = discr_tape.gradient(discr_loss, discriminator.trainable_variables)

      generator_optimizer.apply_gradients(zip(generator_grads, generator.trainable_variables))

      discriminator_optimizer.apply_gradients(zip(discriminator_grads, discriminator.trainable_variables))

# TRAIN FUNCTION
def train(dataset, epochs):
    print("Initiating training...")
    generator_loss_array = []
    discriminator_loss_array = [] 
    epoch = 0
    for epoch in range(epochs):
        imgi = 0
        for input_image, target in dataset:
          imgi += 1
          train_step(input_image, target)
        for input_image, target in dataset.take(1):
          output_image = generator(input_image, training=True)
          output_gen_discr = discriminator([output_image, input_image], training=True)
          output_trg_discr = discriminator([target, input_image], training=True)
          discr_loss = discriminator_loss(output_trg_discr, output_gen_discr)
          gen_loss = generator_loss(output_gen_discr, output_image, target)
          generator_loss_array.append(tf.keras.backend.get_value(gen_loss))
          discriminator_loss_array.append(tf.keras.backend.get_value(discr_loss))
        print("epoch " + str(epoch+1) + " - generator loss: " + str(tf.keras.backend.get_value(gen_loss)) + " | discriminator loss: " 
              + str(tf.keras.backend.get_value(discr_loss)))
        imgi = 0
        for inp, tar in test_dataset.take(5):
            generate_images_test(generator, inp, tar, str(imgi) + "_e" + str(epoch))
            imgi += 1
        epoch += 1
    print("Model successfully trained!")
    title = ["Generator Loss", "Discriminator Loss"]
    display_list = [generator_loss_array, discriminator_loss_array]
    for i in range(2):
      plt.subplot(2, 1, i+1)
      plt.title(title[i])
      plt.plot(range(epochs), display_list[i])
      plt.show()
print("Model successfully loaded!")

# **2. UNDERSTAND THE PROCESS**
This cell is here only to explain better the core concept of this model. In order to train the model to predict inner slices, we need to give the algorithm real data examples of this task. Images A and C are merged together into a single image named AmC which is fed to the model, the objective is to generate a new image which resembles an image B (or the inner slice). Take a look at some real examples in this step.

In [0]:
index = 223
plt.figure(figsize=(20,20))
display_list = [RAW_IMAGES[index], RAW_IMAGES[index+1], RAW_IMAGES[index+2], INPUT_IMAGES[index], OUTPUT_IMAGES[index]]
title = ["Raw Image A", "Raw Image B", "Raw Image C", "Input Image (AmC)", "Target Image (B)"]

for i in range(5):
    plt.subplot(1, 5, i+1)
    plt.title(title[i])
    plt.imshow(np.squeeze(display_list[i]), cmap="Greys_r")
    plt.axis("off")

# **3. TRAIN THE MODEL**
During this step, the train function will be called and training will begin. The number of epochs will define how many times the model is trained and how long will training take. It has been tested that 10-50 epochs generate good results, although the best generated images were produced in a 175 epoch training, increasing that number will result in a case of overfitting as can be seen below.
![Results image](https://raw.github.com/sergiorivera50/CT2CT/master/assets/results.PNG)


In [0]:
# Best generation result -> 175 epochs
epochs = 10
train(train_dataset, epochs)

# **4. VIEW THE RESULTS**
Take a look at some generated images using the model trained in the previous step. If the model you are using has just been trained and compiled in cells 1 and 3, these 5 images will also be stored in the *brain_ct/attempts* folder with a prediction made on each epoch so that you can appreciate how the generator improves its predictions over time.

In [0]:
i = 0
for inp, tar in test_dataset.take(5):
  i += 1
  prediction = generator(inp, training=False)
  plt.figure(figsize=(10,10))
  plt.title("Image " + str(i) + ". Prediction from current generator at " + str(epochs) + " epochs.")
  plt.axis("off")
  plt.imshow(np.squeeze(prediction[0]), cmap="Greys_r")

# **4. SAVE THE TRAINED MODEL**
This step will save the CT2CT trained model into an h5 formatted file. It will be stored in the folder *brain_ct/savedModels*.

In [0]:
MODEL_NAME = "ct2ct_trained_" + str(epochs) + "e.h5"
generator.save(SAVED_MODELS_PATH + "/" + MODEL_NAME)
print("Saved generator at /savedModels/" + MODEL_NAME)

# **5. LOAD A TRAINED MODEL**
Executing this cell will load an already trained model. Then you can come back and execute again cell 4 to see the new results of this generator.

In [0]:
from tensorflow.keras.models import load_model

MODEL_NAME = "ct2ct_trained_50e.h5" # Change this to your trained model
generator = load_model(SAVED_MODELS_PATH + "/" + MODEL_NAME)
print("Loaded generator from /savedModels/" + MODEL_NAME)