In [None]:
'''
Image-to-image classification network 
CycleGAN Resources used: 
https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix
https://keras.io/examples/generative/cyclegan/
https://machinelearningmastery.com/cyclegan-tutorial-with-keras/

SVM Resources used:  
https://scikit-learn.org/stable/modules/svm.html
'''

In [None]:
'''
## import tensorflow and alocate GPU memory growth
'''
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

In [None]:
'''
## CycleGAN libraries needed
'''
import numpy as np
from random import random
from numpy import load
from numpy import zeros
from numpy import ones
from numpy import asarray
from numpy.random import randint
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras_contrib.layers.normalization.instancenormalization import InstanceNormalization

from os import listdir
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed
autotune = tf.data.experimental.AUTOTUNE
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
"""
## Libraries for the SVM 
"""
from sklearn import svm
from sklearn.datasets import make_blobs
from sklearn.inspection import DecisionBoundaryDisplay

import sklearn
from sklearn import metrics
from numpy import argmax
from sklearn.metrics import roc_auc_score

In [None]:
## Read in the training and test set from a tradionally trained CNN 
train_csv = pd.read_csv('/CNN_trainset.csv')
test_csv = pd.read_csv('/CNN_valset.csv')

In [None]:
##Split the training and test set based on malignant and benign images. 
train_A = train_csv[train_csv.target==1]
test_A = test_csv[test_csv.target==1]
train_B = train_csv[train_csv.target==0]
test_B = test_csv[test_csv.target==0]

In [None]:
##Sample 453 benign images so that there are equal classes of benign and malignant images
#train_B = train_B.sample(n=453, random_state = 316)

In [None]:
'''
## load all images in a directory into memory
def load_images(path, image_list = None, filetype = None, size=(256,256)):
    data_list = list()
    # enumerate filenames in directory, assume all are images
    if image_list = None: 
        image_list = listdir(path)
    for filename in image_list:
        # load and resize the image
        if filetype == None:
            pixels = load_img(path + filename , target_size=size)
        else: 
            pixels = load_img(path + filename + filetype, target_size=size)
        # convert to numpy array
        pixels = img_to_array(pixels)
        # store
        data_list.append(pixels)
    return asarray(data_list)


## dataset path
path = ''

# load dataset A 
dataA1 = load_images(path, train_A['image_dir'].to_numpy())
dataA2 = load_images(path, test_A['image_dir'].to_numpy())
# load dataset B
dataB1 = load_images(path, train_B['image_dir'].to_numpy())
dataB2 = load_images(path, test_B['image_dir'].to_numpy())
'''

In [None]:
""" Load saved CNN/CycleGAN Dataset data"""
dataset = np.load('mal2ben_CNN.npz')
dataA1, dataA2, dataB1, dataB2,  = dataset['arr_0'], dataset['arr_1'], dataset['arr_2'], dataset['arr_3']
print('Loaded: ', dataA1.shape, dataA2.shape, dataB1.shape, dataB2.shape)

In [None]:
#Store the ground truth value as numpy array. 
testA_ground_truth = test_A['target'].to_numpy()
testB_ground_truth = test_B['target'].to_numpy()

In [None]:
#Make a copy of the CSV dataframe 
csv_testA = test_A
csv_testB = test_B

In [None]:
from tensorflow import keras

# Define the standard image size.
orig_img_size = (256, 256)
# Size of the random crops to be used during training.
input_img_size = (256, 256, 3)
# Weights initializer for the layers.
kernel_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
# Gamma initializer for instance normalization.
gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

buffer_size = 256
batch_size = 1
IMG_WIDTH = 256
IMG_HEIGHT = 256


def normalize_img(img):
    img = tf.cast(img, dtype=tf.float32)
    # Map values in the range [-1, 1]
    return (img / 127.5) - 1.0

def preprocess_train_image(img):
    # Random flip
    img = tf.image.random_flip_left_right(img)
    # Resize to the original size first
    img = tf.image.resize(img, [*orig_img_size])
    # Random crop to 256X256
    img = tf.image.random_crop(img, size=[*input_img_size])
    # Normalize the pixel values in the range [-1, 1]
    img = normalize_img(img)
    return img


def preprocess_test_image(img):
    # Only resizing and normalization for the test images.
    img = tf.image.resize(img, [input_img_size[0], input_img_size[1]])
    img = normalize_img(img)
#     img = tf.image.rot90(img, k=3)
    return img

def random_jitter(image):
  # resizing to 286 x 286 x 3
    image = tf.image.resize(image, [286, 286],
    method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    # randomly cropping to 256 x 256 x 3
    image = random_crop(image)

    # random mirroring
    image = tf.image.random_flip_left_right(image)

    return image

def random_crop(image):
    cropped_image = tf.image.random_crop(
    image, size=[IMG_HEIGHT, IMG_WIDTH, 3])

    return cropped_image



In [None]:
def dataset_creation(dataset, preprocess_image, shuffle = True):
    tensor_dataset = tf.data.Dataset.from_tensor_slices(dataset)
    if shuffle == True:
        mapped_dataset = (
            tensor_dataset.map(preprocess_test_image, num_parallel_calls=autotune)
                .cache()
                .shuffle(buffer_size)
                .batch(batch_size)
            )
    else: 
        mapped_dataset = (
            tensor_dataset.map(preprocess_test_image, num_parallel_calls=autotune)
                .cache()
                #.shuffle(buffer_size)
                .batch(batch_size)
            )
    return mapped_dataset

In [None]:
"""
## Create `Dataset` objects
"""

trainA_data = dataset_creation(dataA1, preprocess_train_image)
trainB_data = dataset_creation(dataB1, preprocess_train_image)

trainA_data2 = dataset_creation(dataA1, preprocess_test_image, False)
trainB_data2 = dataset_creation(dataB1, preprocess_test_image, False)

testA_data = dataset_creation(dataA2, preprocess_test_image, False)
testB_data = dataset_creation(dataB2, preprocess_test_image, False)

## Additional Datasets
#large_benign_test_data = dataset_creation(benign_test, preprocess_test_image)
#mal_2016_test_data = dataset_creation(malignant_2016_images, preprocess_test_image)
#mal_2017_test_data = dataset_creation(malignant_2017_images, preprocess_test_image)

#hist_ben_test_data = dataset_creation(hist_ben_test, preprocess_test_image)
#single_ben_test_data = dataset_creation(single_ben_test, preprocess_test_image)



In [None]:
## Create an empty array and then calculate the distance between every cycled image and its original image. 
from skimage.metrics import structural_similarity as ssim
def distance_array(cycled, original, Ltype):
    dist_array = np.zeros(cycled.shape[0])
    if Ltype == 'L2': #Calculate the L2 distance 
        for i, val in enumerate(cycled):
            dist = np.linalg.norm((cycled[i]/.255).ravel() - (original[i]/.255).ravel(), ord =2)
            dist_array[i] = dist
    elif Ltype == 'L1': #Calculate the L1 distance 
        for i, val in enumerate(cycled):
            dist = np.linalg.norm((cycled[i]/.255).ravel() - (original[i]/.255).ravel(), ord =1)
            dist_array[i] = dist
    elif Ltype == 'SSIM':#Calculate SSIM index 
        for i, val in enumerate(cycled):
            dist = ssim(original[i]/255., cycled[i]/255., multichannel = True)
            dist_array[i] = dist
    return dist_array

In [None]:
"""
## Building blocks used in the CycleGAN generators and discriminators
"""


class ReflectionPadding2D(layers.Layer):
    """Implements Reflection Padding as a layer.
    Args:
        padding(tuple): Amount of padding for the
        spatial dimensions.
    Returns:
        A padded tensor with the same type as the input tensor.
    """

    def __init__(self, padding=(1, 1), **kwargs):
        self.padding = tuple(padding)
        super(ReflectionPadding2D, self).__init__(**kwargs)

    def call(self, input_tensor, mask=None):
        padding_width, padding_height = self.padding
        padding_tensor = [
            [0, 0],
            [padding_height, padding_height],
            [padding_width, padding_width],
            [0, 0],
        ]
        return tf.pad(input_tensor, padding_tensor, mode="REFLECT")


def residual_block(
    x,
    activation,
    kernel_initializer=kernel_init,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding="valid",
    gamma_initializer=gamma_init,
    use_bias=False,
):
    dim = x.shape[-1]
    input_tensor = x

    x = ReflectionPadding2D()(input_tensor)
    x = layers.Conv2D(
        dim,
        kernel_size,
        strides=strides,
        kernel_initializer=kernel_initializer,
        padding=padding,
        use_bias=use_bias,
    )(x)
    x = InstanceNormalization(axis=-1)(x)
    x = activation(x)
    #Dropout
    x = layers.Dropout(.5, seed = 316)(x)

    x = ReflectionPadding2D()(x)
    x = layers.Conv2D(
        dim,
        kernel_size,
        strides=strides,
        kernel_initializer=kernel_initializer,
        padding=padding,
        use_bias=use_bias,
    )(x)
    x = InstanceNormalization(axis=-1)(x)    
    x = layers.add([input_tensor, x])
    return x


def downsample(
    x,
    filters,
    activation,
    kernel_initializer=kernel_init,
    kernel_size=(3, 3),
    strides=(2, 2),
    padding="same",
    gamma_initializer=gamma_init,
    use_bias=False,
):
    x = layers.Conv2D(
        filters,
        kernel_size,
        strides=strides,
        kernel_initializer=kernel_initializer,
        padding=padding,
        use_bias=use_bias,
    )(x)
    x =  InstanceNormalization(axis=-1)(x)
    if activation:
        x = activation(x)
    return x


def upsample(
    x,
    filters,
    activation,
    kernel_size=(3, 3),
    strides=(2, 2),
    padding="same",
    kernel_initializer=kernel_init,
    gamma_initializer=gamma_init,
    use_bias=False,
):
    x = layers.Conv2DTranspose(
        filters,
        kernel_size,
        strides=strides,
        padding=padding,
        kernel_initializer=kernel_initializer,
        use_bias=use_bias,
    )(x)
    x =  InstanceNormalization(axis=-1)(x)
    if activation:
        x = activation(x)
    return x


"""
## Build the generators
The generator consists of downsampling blocks: nine residual blocks
and upsampling blocks. The structure of the generator is the following:
```
c7s1-64 ==> Conv block with `relu` activation, filter size of 7
d128 ====|
         |-> 2 downsampling blocks
d256 ====|
R256 ====|
R256     |
R256     |
R256     |
R256     |-> 9 residual blocks
R256     |
R256     |
R256     |
R256 ====|
u128 ====|
         |-> 2 upsampling blocks
u64  ====|
c7s1-3 => Last conv block with `tanh` activation, filter size of 7.
```
"""


def get_resnet_generator(
    filters=64,
    num_downsampling_blocks=2,
    num_residual_blocks=9,
    num_upsample_blocks=2,
    gamma_initializer=gamma_init,
    name=None,
):
    img_input = layers.Input(shape=input_img_size, name=name + "_img_input")
    x = ReflectionPadding2D(padding=(3, 3))(img_input)
    x = layers.Conv2D(filters, (7, 7), kernel_initializer=kernel_init, use_bias=False)(
        x
    )
    
    x =  InstanceNormalization(axis=-1)(x)
    x = layers.Activation("relu")(x)

    # Downsampling
    for _ in range(num_downsampling_blocks):
        filters *= 2
        x = downsample(x, filters=filters, activation=layers.Activation("relu"))

    # Residual blocks
    for _ in range(num_residual_blocks):
        x = residual_block(x, activation=layers.Activation("relu"))

    # Upsampling
    for _ in range(num_upsample_blocks):
        filters //= 2
        x = upsample(x, filters, activation=layers.Activation("relu"))

    # Final block
    x = ReflectionPadding2D(padding=(3, 3))(x)
    x = layers.Conv2D(3, (7, 7), padding="valid")(x)
    x = layers.Activation("tanh")(x)

    model = keras.models.Model(img_input, x, name=name)
    return model


"""
## Build the discriminators
The discriminators implement the following architecture:
`C64->C128->C256->C512`
"""


def get_discriminator(
    filters=64, kernel_initializer=kernel_init, num_downsampling=3, name=None
):
    img_input = layers.Input(shape=input_img_size, name=name + "_img_input")
    x = layers.Conv2D(
        filters,
        (4, 4),
        strides=(2, 2),
        padding="same",
        kernel_initializer=kernel_initializer,
    )(img_input)
    x = layers.LeakyReLU(0.2)(x)

    num_filters = filters
    for num_downsample_block in range(3):
        num_filters *= 2
        if num_downsample_block < 2:
            x = downsample(
                x,
                filters=num_filters,
                activation=layers.LeakyReLU(0.2),
                kernel_size=(4, 4),
                strides=(2, 2),
            )
        else:
            x = downsample(
                x,
                filters=num_filters,
                activation=layers.LeakyReLU(0.2),
                kernel_size=(4, 4),
                strides=(1, 1),
            )

    x = layers.Conv2D(
        1, (4, 4), strides=(1, 1), padding="same", kernel_initializer=kernel_initializer
    )(x)

    model = keras.models.Model(inputs=img_input, outputs=x, name=name)
    return model

In [None]:
"""
## Save the CycleGAN model's generators and discriminators
"""
def save_model(cycleGan, path):
    cycleGan.disc_Y.save(path + '/disc_Y')
    cycleGan.disc_X.save(path + '/disc_X')
    cycleGan.gen_F.save(path + '/gen_F')
    cycleGan.gen_G.save(path + '/gen_G')

In [None]:
"""
## Create the CycleGAN model
"""

class CycleGan(keras.Model):
    def __init__(
        self,
        generator_G,
        generator_F,
        discriminator_X,
        discriminator_Y,
        lambda_cycle=10.0,
        lambda_identity= .5,
    ):
        super(CycleGan, self).__init__()
        self.gen_G = generator_G
        self.gen_F = generator_F
        self.disc_X = discriminator_X
        self.disc_Y = discriminator_Y
        self.lambda_cycle = lambda_cycle
        self.lambda_identity = lambda_identity
    def call (self, inputs):
        return 

    def compile(
        self,
        gen_G_optimizer,
        gen_F_optimizer,
        disc_X_optimizer,
        disc_Y_optimizer,
        gen_loss_fn,
        disc_loss_fn,
    ):
        super(CycleGan, self).compile()
        self.gen_G_optimizer = gen_G_optimizer
        self.gen_F_optimizer = gen_F_optimizer
        self.disc_X_optimizer = disc_X_optimizer
        self.disc_Y_optimizer = disc_Y_optimizer
        self.generator_loss_fn = gen_loss_fn
        self.discriminator_loss_fn = disc_loss_fn
        self.cycle_loss_fn = keras.losses.MeanAbsoluteError()
        self.identity_loss_fn = keras.losses.MeanAbsoluteError()
        
    def train_step(self, batch_data):
        # x is dataA and y is dataB
        real_x, real_y = batch_data

        with tf.GradientTape(persistent=True) as tape:
            # B to fake A
            fake_y = self.gen_G(real_x, training=True)
            # A to fake B -> y2x
            fake_x = self.gen_F(real_y, training=True)

            # Cycle (A to fake B to fake A): x -> y -> x
            cycled_x = self.gen_F(fake_y, training=True)
            # Cycle (B to fake A to fake B) y -> x -> y
            cycled_y = self.gen_G(fake_x, training=True)

            # Identity mapping
            same_x = self.gen_F(real_x, training=True)
            same_y = self.gen_G(real_y, training=True)

            # Discriminator output
            disc_real_x = self.disc_X(real_x, training=True)
            disc_fake_x = self.disc_X(fake_x, training=True)

            disc_real_y = self.disc_Y(real_y, training=True)
            disc_fake_y = self.disc_Y(fake_y, training=True)

            # Generator adverserial loss
            gen_G_loss = self.generator_loss_fn(disc_fake_y)
            gen_F_loss = self.generator_loss_fn(disc_fake_x)

            # Generator cycle loss
            cycle_loss_G = self.cycle_loss_fn(real_y, cycled_y) * (self.lambda_cycle/2)
            cycle_loss_F = self.cycle_loss_fn(real_x, cycled_x) * (self.lambda_cycle/2)

            # Generator identity loss
            id_loss_G = (
                self.identity_loss_fn(real_y, same_y)
                * self.lambda_cycle
                * self.lambda_identity
            )
            id_loss_F = (
                self.identity_loss_fn(real_x, same_x)
                * self.lambda_cycle
                * self.lambda_identity
            )

            # Total generator loss
            total_loss_G = gen_G_loss + id_loss_G #+ cycle_loss_G
            total_loss_F = gen_F_loss + id_loss_F #+ cycle_loss_F 

            # Discriminator loss
            disc_X_loss = self.discriminator_loss_fn(disc_real_x, disc_fake_x)
            disc_Y_loss = self.discriminator_loss_fn(disc_real_y, disc_fake_y)

        # Get the gradients for the generators
        grads_G = tape.gradient(total_loss_G, self.gen_G.trainable_variables)
        grads_F = tape.gradient(total_loss_F, self.gen_F.trainable_variables)

        # Get the gradients for the discriminators
        disc_X_grads = tape.gradient(disc_X_loss, self.disc_X.trainable_variables)
        disc_Y_grads = tape.gradient(disc_Y_loss, self.disc_Y.trainable_variables)

        # Update the weights of the generators
        self.gen_G_optimizer.apply_gradients(
            zip(grads_G, self.gen_G.trainable_variables)
        )
        self.gen_F_optimizer.apply_gradients(
            zip(grads_F, self.gen_F.trainable_variables)
        )

        # Update the weights of the discriminators
        self.disc_X_optimizer.apply_gradients(
            zip(disc_X_grads, self.disc_X.trainable_variables)
        )
        self.disc_Y_optimizer.apply_gradients(
            zip(disc_Y_grads, self.disc_Y.trainable_variables)
        )

        return {
            "G_loss": total_loss_G,
            "F_loss": total_loss_F,
            "D_X_loss": disc_X_loss,
            "D_Y_loss": disc_Y_loss,
        }


"""
## Callback to save the model periodically
"""


class GANMonitor(keras.callbacks.Callback):
    """A callback to generate and save images after each epoch"""

    def __init__(self, num_img=4):
        self.num_img = num_img
        
    def on_epoch_end(self, epoch, logs= None):
        if (epoch % 10 == 0):
            save_model(self.model, 'cyclegan_model/CycleGAN')

In [None]:
# Loss functions for the generators and discriminators 

# Loss function for evaluating adversarial loss
adv_loss_fn = keras.losses.MeanSquaredError()

# Define the loss function for the generators
def generator_loss_fn(fake):
    fake_loss = adv_loss_fn(tf.ones_like(fake), fake)
    return fake_loss


# Define the loss function for the discriminators
def discriminator_loss_fn(real, fake):
    real_loss = adv_loss_fn(tf.ones_like(real), real)
    fake_loss = adv_loss_fn(tf.zeros_like(fake), fake)
    return (real_loss + fake_loss) * 0.5


In [None]:
# Callbacks
plotter = GANMonitor()

In [None]:
# To load or create a new CycleGAN model. Leave path blank if creating a new model. 
def load_model(path = None):
    
    if path == None:
        # Create the generators
        gen_G = get_resnet_generator(name="generator_G")
        gen_F = get_resnet_generator(name="generator_F")

        # Create the discriminators
        disc_X = get_discriminator(name="discriminator_X")
        disc_Y = get_discriminator(name="discriminator_Y")
    else: 
        # Load the generators
        genF = keras.models.load_model(path + '/gen_F')
        genG = keras.models.load_model(path + '/gen_G')
        
        #Load the discriminators
        discY = keras.models.load_model(path + '/disc_Y')
        discX = keras.models.load_model(path + '/disc_X')
        
    
    cycleGan_model = CycleGan(
        generator_G=genG, generator_F=genF, discriminator_X=discX, discriminator_Y=discY
    )

    # Compile the model
    cycleGan_model.compile(
        gen_G_optimizer=keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5),
        gen_F_optimizer=keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5),
        disc_X_optimizer=keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5),
        disc_Y_optimizer=keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5),
        gen_loss_fn=generator_loss_fn,
        disc_loss_fn=discriminator_loss_fn,
    )
    return cycleGan_model

In [None]:
#cycle_gan_model = load_model('cyclegan_model/CycleGAN')

In [None]:
'''
history = cycle_gan_model.fit(
    tf.data.Dataset.zip((trainB_data.repeat(1000), trainA_data.repeat(1000))),
    epochs=401,
    steps_per_epoch = 300,
    callbacks=[plotter]#, model_checkpoint_callback]
)
'''

In [None]:
#history.history['G_loss']
plt.plot(history.history['G_loss'])
plt.plot(history.history['F_loss'])
plt.plot(history.history['D_X_loss'])
plt.plot(history.history['D_Y_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['G_loss', 'F_loss', 'D_X_loss', 'D_Y_loss'], loc='upper left')
plt.show()

In [None]:
#save_model(cycle_gan_model, 'cyclegan_model/CycleGAN')

In [None]:
''' 
## Generate images using one of the two generators
generator: generator to be used to produce images. 
test_set: a dataset object to be used by the generators to produce fake images
image_list: an empty array of the images shape to store the predictions. 

'''
def gen_function(generator, test_set, image_list):
    if (generator == 'gen_G'): #To Fake A
            for i, img in enumerate(test_set):
                prediction = cycle_gan_model.gen_G(img, training=False)[0].numpy()
                prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
                image_list[i, :, :, :] = prediction
                
            
    elif (generator == 'gen_F'): #To Fake B
            for i, img in enumerate(test_set):
                prediction = cycle_gan_model.gen_F(img, training=False)[0].numpy()
                prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
                image_list[i, :, :, :] = prediction
                
    return image_list

In [None]:
''' 
## Generate images and compute the L1 distance
dataset: a dataset object to be used by the generators to produce fake images
images: numpy array of images to be used for the L1 distances 
'''
def generate_image(dataset, images):
    A_generated = gen_function('gen_G', dataset, np.zeros(images.shape))
    B_generated = gen_function('gen_F', dataset, np.zeros(images.shape))
    
    A_distance = distance_array(A_generated, images, 'L1')
    B_distance = distance_array(B_generated, images, 'L1')
    
    return A_generated, B_generated, A_distance, B_distance

In [None]:
# Generate fake images and compute the L1 distances for the test data
BA_test, BB_test, BA_test_dist, BB_test_dist = generate_image(testB_data, dataB2)
AA_test, AB_test, AA_test_dist, AB_test_dist = generate_image(testA_data, dataA2)

In [None]:
# Generate fake images and compute the L1 distances for the train data
BA_train, BB_train, BA_train_dist, BB_train_dist = generate_image(trainB_data2, dataB1)
AA_train, AB_train, AA_train_dist, AB_train_dist = generate_image(trainA_data2, dataA1)

In [None]:
# Plot the L1 distances using the distance to 
plt.scatter(BB_test_dist, BA_test_dist, c= "blue", label = "Test B Images", alpha = .100)
plt.scatter(AB_test_dist, AA_test_dist, c= "red", label = "Test A Images", alpha = .10)


plt.xlabel('L1 Distance To Negative Class', fontname='Times', fontsize = 10)
plt.ylabel('L1 Distance To Positive Class', fontname='Times', fontsize = 10)
plt.title("L1 Distance to Class", fontname = 'Times', fontsize = 10)


plt.xlim(0, .8*100000000) 
plt.ylim(0, .8*100000000)
bbox_props = dict(boxstyle="round", fc="w", ec="0.5", alpha=0.8)
plt.text(1e7, 1e7, "3", ha="center", va="center", size=10,
        bbox=bbox_props)

bbox_props = dict(boxstyle="round", fc="w", ec="0.5", alpha=0.8)
plt.text(4e7, 1e7, "2", ha="center", va="center", size=10,
        bbox=bbox_props)

bbox_props = dict(boxstyle="round", fc="w", ec="0.5", alpha=0.9)
plt.text(1e7, 4e7, "1", ha="center", va="center", size=10,
        bbox=bbox_props)
plt.legend()
#plt.savefig('test_l1.pdf', bbox_inches = 'tight')



In [None]:
''' 
B_distance: L1 distance in the B direction
A_distance: L1 distance in A direction, computed from difference of the produced images from the original images. 
'''
def translation_ratio(B_distance, A_distance):
    TR = np.zeros(B_distance.shape[0])
    for i in range(0, TR.shape[0]):
        TR[i] = B_distance[i]/(B_distance[i]+A_distance[i])
    return TR

''' 
i: image index
original: the original dataset
cycled: the cycled dataset in either class direction (A or B)
'''
def metrics_scores(i, original, cycled):
    l2dist = np.linalg.norm((cycled[i]/.255).ravel() - (original[i]/.255).ravel(), ord =2)
    l1dist = np.linalg.norm((cycled[i]/.255).ravel() - (original[i]/.255).ravel(), ord =1)
    ssim_val = ssim(original[i]/255., cycled[i]/255., multichannel = True)
    return ssim_val, l1dist, l2dist

In [None]:
''' 
## Compute the translation ratio for the images. 
The negative class (B), should be inputed first and the positive class distance (A) should be used second. 
'''
TR_A = translation_ratio(AB_test_dist, AA_test_dist)
TR_B = translation_ratio(BB_test_dist, BA_test_dist)

In [None]:
# Plot the translation ratio
plt.figure(figsize=(3,3))
plt.hist(TR_B, bins = 30, alpha = .7, label = "Test Benign Images", color = 'blue')
plt.hist(TR_A, bins = 30, alpha = .7, label = "Test Malignant Images", color = 'red')

plt.ylabel("Count", fontname='Times', fontsize = 10)
plt.xlabel("Translation Ratio", fontname='Times', fontsize = 10)
plt.title("L1 Translation Ratios", fontname='Times', fontsize = 10)
plt.legend()
plt.show
#plt.savefig('test_tr.pdf', bbox_inches = 'tight')

In [None]:
''' 
## Show a panel of images
d1: original dataset (A or B)
d2: distance to the negative class, B
d3: distance to the positive class, A
TR: translation ratio array for the original dataset (A or B)
csv: csv of image names to be used to print the image's name if needed
title: class of the input image
prediction: an array of a CNN's predictions
'''
def show_panel(d1, d2, d3, TR, index, csv, title,  prediction = None):
    
    B_ssim, B_l1, B_l2 = metrics_scores(index, d1, d2)
    A_ssim, A_l1, A_l2 = metrics_scores(index, d1, d3)

   
    fig, (ax1, ax2, ax3) = plt.subplots(figsize=(15, 10), ncols=3, nrows=1)
    pos1 = ax1.imshow(d1[index,:,:,:]/255.)
    ax1.set_title('Input\n'+ title + ' Image', fontsize = 14)
    ax1.text(0, 310, 'Translation Ratio L1: ' +  "{:.4f}".format(TR[index]), fontsize=14)
    if prediction != None:
        ax1.text(0, 330, "CNN Prediction: " + str(prediction[index]), fontsize=14)
    
    pos2 = ax2.imshow(d2[index,:,:,:]/255.)
    #ax2.text(0, -70, "Image: " +str(csv[index]), fontsize=14)
    ax2.set_title('Produced\nBenign Image ', fontsize = 14)
    ax2.text(0, 300, 'L1 Distance: ' + "{:,.0f}".format(B_l1), fontsize=14)
    ax2.text(0, 320, 'L2 Distance: ' + "{:,.0f}".format(B_l2), fontsize=14)
    ax2.text(0, 340, 'SSIM: ' + "{:.4f}".format(B_ssim), fontsize=14)


    
    pos3 = ax3.imshow(d3[index,:,:,:]/255.)
    ax3.set_title('Produced\n Malignant Image', fontsize = 14)
    ax3.text(0, 300, 'L1 Distance: ' + "{:,.0f}".format(A_l1), fontsize=14)
    ax3.text(0, 320, 'L2 Distance: ' + "{:,.0f}".format(A_l2), fontsize=14)
    ax3.text(0, 340, 'SSIM: ' + "{:.4f}".format(A_ssim), fontsize=14)
 

In [None]:
# Reset the image index and then show the images
copyA = test_A.reset_index()
copyB = test_B.reset_index()
show_panel(dataB2,BB_test, BA_test, TR_B, i, copyB['image_dir'], "Benign")
show_panel(dataA2, AB_test, AA_test, TR_A, i, copyA['image_dir'], "Malignant")

In [None]:
# Plot a hexbin plot of the data. Use for larger image samples. 

x1 = BB_train_dist
y1 = BA_train_dist

x2 = AB_train_dist
y2 = AA_train_dist

# Define hexbin grid extent
xmin = min(*x1, *x2)
xmax = max(*x1, *x2)
ymin = min(*y1, *y2)
ymax = max(*y1, *y2)
ext = (xmin, xmax, ymin, ymax)

# Draw figure with colorbars
plt.figure(figsize=(10, 6))
hist1 = plt.hexbin(x1, y1, gridsize=30, cmap='Blues', mincnt=1, alpha=0.3, extent=ext)
hist2 = plt.hexbin(x2, y2, gridsize=30, cmap='Reds', mincnt=1, alpha=.5, extent=ext)

clb2 = plt.colorbar(hist2, orientation='vertical')
clb1 = plt.colorbar(hist1, orientation='vertical')

# Set titles 
clb1.ax.set_title('Benign Test Images',fontsize=10)
clb2.ax.set_title('Malignant Test Images',fontsize=10)
plt.title("L1 Distances")
plt.xlabel("L1 Distance to Benign")
plt.ylabel("L1 Distance to Malignant")

plt.show()

In [None]:
# Normalize the distances to each class for the SVM 
def normalized_data(BB, BA, AB, AA):
    stacked_B = np.stack((BB, BA), axis = 1)
    stacked_A = np.stack((AB, AA), axis = 1)
    
    target_B = np.zeros((stacked_B.shape[0]))
    target_A = np.ones((stacked_A.shape[0]))
    
    stacked_distances = np.vstack((stacked_B, stacked_A))
    stacked_targets = np.concatenate((target_B, target_A))
    
    distance_mean = np.mean(stacked_distances)
    distance_sd = np.std(stacked_distances)
    mean_zero = (stacked_distances-distance_mean)/distance_sd
    
    X = mean_zero[:, :2]
    y = stacked_targets
    return  X, y

X, y = normalized_data(BB_test_dist, BA_test_dist, AB_test_dist, AA_test_dist)
X_train, y_train = normalized_data(BB_train_dist, BA_train_dist, AB_train_dist, AA_train_dist)

In [None]:
#Train an SVM with a weight, then use the trained model to predict on the test set
def weighted_values(weight,X, y, predict_X):
    svm_model= svm.SVC(kernel="linear", class_weight= {1:weight}, probability = True, random_state = 316)
    svm_model.fit(X, y)
    return svm_model.predict_proba(predict_X)


predicted_values_train = weighted_values(3, X_train, y_train, X_train)[:,1]
predicted_values = weighted_values(3, X_train, y_train, X)[:,1]

In [None]:
# Plot the ROC curve and computer the true and false positive rate
def plot_roc(name, labels, predictions, **kwargs):
    fp, tp, thresholds = sklearn.metrics.roc_curve(labels, predictions)
    plt.plot(100*fp, 100*tp, label=name, linewidth=2, **kwargs)
    plt.xlabel('False positives [%]', fontsize = 10)
    plt.ylabel('True positives [%]', fontsize = 10)
    plt.grid(True)
    ax = plt.gca()
    ax.set_aspect('equal')
    return fp, tp, thresholds

def best_threshold(FPR, TPR, THRESHOLDS):
    J = TPR - FPR
    ix = argmax(J)
    best_thresh = THRESHOLDS[ix]
    print('Best SVM Threshold=%f' % (best_thresh))
    
plt.figure(figsize=(3,3))
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
FPR_train, TPR_train, THRESHOLDS = plot_roc("SVM Model Train", y_train, predicted_values_train, color=colors[2])
FPR_test, TPR_test, THRESHOLDS = plot_roc("SVM Model Test", y, predicted_values, color=colors[1])


# Add the AUC value to the graph. 
train_ROC = metrics.auc(FPR_train, TPR_train)
test_ROC = metrics.auc(FPR_test, TPR_test)
plt.text(35, 45, 'Train AUC: '+ "{:.4f}".format(train_ROC) + '\nTest AUC: ' + "{:.4f}".format(test_ROC), size = 10, bbox={
        'facecolor': 'white', 'alpha': 0.7, 'pad': 5})

plt.legend(loc='lower right', fontsize = 10)
plt.title("ROC Curve", fontsize = 10)
#plt.savefig('test_roc.pdf', bbox_inches = 'tight')

In [None]:
## Load in a trained CNN model 
load_model_Full= tf.keras.models.load_model('models/cnn_melanoma_no_weights_7.1.h5')

In [None]:
## Compile the loaded in model with additional metrics for evaluation. 
METRICS = [
      tf.keras.metrics.TruePositives(name='tp'),
      tf.keras.metrics.FalsePositives(name='fp'),
      tf.keras.metrics.TrueNegatives(name='tn'),
      tf.keras.metrics.FalseNegatives(name='fn'), 
      tf.keras.metrics.BinaryAccuracy(name='accuracy'),
      tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall'),
      tf.keras.metrics.AUC(name='auc'),
      tf.keras.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
#       tf.keras.metrics.SpecificityAtSensitivity(.82)
]
opt = tf.keras.optimizers.Adam(learning_rate = 1e-5 )#1e-5)
load_model_Full.compile(loss='binary_crossentropy', metrics=METRICS,optimizer=opt)


In [None]:
## Functions for evaluating and predicting images using the traditional CNN
def model_evaluate(image_array, ground_truth, model):
    tensor_predictions = tf.convert_to_tensor(image_array/255.)
    tensor_truth = tf.convert_to_tensor(ground_truth)
    model.evaluate(tensor_predictions, tensor_truth, batch_size = 1)
    
def model_predict(image_array, model):
    tensor_predictions = tf.convert_to_tensor(image_array/255.)
    CNN_model_predictions = model.predict(tensor_predictions)
    return CNN_model_predictions

In [None]:
## Use the same dataset to make predictions with the CNN. 
'''Use the predictions to evaluate missed images and plot the ROC curve against the CycleGAN'''
full_test = np.vstack((dataB2, dataA2))
test_ground_truth = np.concatenate((test_B.target.to_numpy(), test_A.target.to_numpy()), axis=None)
test_ground_truth = np.concatenate((train_B.target.to_numpy(), train_A.target.to_numpy()), axis=None)
full_train = np.vstack((dataB1, dataA1))


CNN_predict = model_predict(full_test,load_CNN_full)
CNN_predict_train = model_predict(full_train,load_CNN_full)