In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras

import glob
import os

from keras import Input
from keras.applications import VGG19
from keras.callbacks import TensorBoard
from keras.layers import BatchNormalization, Activation, LeakyReLU, Add, Dense
from keras.layers import Conv2D, UpSampling2D
from keras.models import Model
from keras.optimizers import Adam


import random
from numpy import asarray
from itertools import repeat

import imageio
from imageio import imread
from PIL import Image
from skimage.transform import resize as imresize
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

In [None]:
import os
import random
import shutil

# Set the path to the train directory
train_dir = "/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/train/"

# Create the val directory
val_dir = "/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/val/"
os.makedirs(val_dir, exist_ok=True)

# Define the split percentage
split_pct = 0.3

# Create subdirectories in train and val directories for DME and DRUSEN
for subdir in ['DME', 'DRUSEN']:
    os.makedirs(os.path.join(train_dir, subdir), exist_ok=True)
    os.makedirs(os.path.join(val_dir, subdir), exist_ok=True)

    # Get a list of file names for this subdirectory
    file_names = os.listdir(os.path.join(train_dir, subdir))

    # Shuffle the list of file names
    random.shuffle(file_names)

    # Split the file names into train and val sets
    split_idx = int(len(file_names) * (1 - split_pct))
    train_files = file_names[:split_idx]
    val_files = file_names[split_idx:]

    # Move the files into the appropriate directories
    for file_name in train_files:
        src_path = os.path.join(train_dir, subdir, file_name)
        dest_path = os.path.join(train_dir, subdir, file_name)
        shutil.move(src_path, dest_path)

    for file_name in val_files:
        src_path = os.path.join(train_dir, subdir, file_name)
        dest_path = os.path.join(val_dir, subdir, file_name)
        shutil.move(src_path, dest_path)

In [None]:
# data path
TRAIN_PATH = r'/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/train/'
VAL_PATH = r'/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/val/'
TEST_PATH = r'/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/test/'
data_path = TRAIN_PATH

epochs = 150

# batch size equals to 8 (due to RAM limits)
batch_size = 8

# define the shape of low resolution image (LR) 
low_resolution_shape = (32, 32, 3)

# define the shape of high resolution image (HR) 
high_resolution_shape = (128, 128, 3)

# optimizer for discriminator, generator 
common_optimizer = Adam(0.0002, 0.5)

# use seed for reproducible results
SEED = 2020 
tf.random.set_seed(SEED)

In [None]:
# Define a function that takes a path to image data as an argument
def get_train_images(data_path):
    
    # Define the two classes of images we're looking for
    CLASSES = ['DME', 'DRUSEN']
    
    # Create an empty list to store image file paths
    image_list = []

    # Loop through each class of images
    for class_type in CLASSES:
        # Use the glob module to find all image files within the folder for that class
        # and add the resulting list of file paths to the image_list using the extend method
        image_list.extend(glob.glob(data_path + class_type + '/*'))
    
    # Return the complete list of image file paths
    return image_list

In [None]:
image_list = get_train_images(data_path)

In [None]:
# Print Image list
#image_list

In [None]:
# Define a function that takes a list of image file paths as an argument
def find_img_dims(image_list):
    
    # Initialize empty lists to store minimum and maximum image sizes
    min_size = []
    max_size = []
    
    # Loop through each image in the list
    for i in range(len(image_list)):
        # Open the image file using the Image module from PIL
        im = Image.open(image_list[i])
        # Add the minimum and maximum dimensions of the image to their respective lists
        min_size.append(min(im.size))
        max_size.append(max(im.size))
    
    # Return the overall minimum and maximum image dimensions as a tuple
    return min(min_size), max(max_size)

In [None]:
# get min/max image sizes

image_list = get_train_images(data_path)
min_size, max_size = find_img_dims(image_list)
print('The min and max image dims are {} and {} respectively.'
      .format(min_size, max_size))

In [None]:
# data path
TRAIN_PATH = r'/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/train/'
data_path = TRAIN_PATH

In [None]:
import shutil

src_folder = '/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/train'
dst_folder = '/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/original_train_images'

shutil.copytree(src_folder, dst_folder)

In [None]:
# Define a function that takes two images as input
def compute_psnr(original_image, generated_image):
    
    # Convert the input images to TensorFlow tensors with dtype float32
    original_image = tf.convert_to_tensor(original_image, dtype=tf.float32)
    generated_image = tf.convert_to_tensor(generated_image, dtype=tf.float32)
    
    # Use the TensorFlow image module to compute the peak signal-to-noise ratio (PSNR) between the two images
    # Set the maximum value of the images to 1.0
    psnr = tf.image.psnr(original_image, generated_image, max_val=1.0)

    # Compute the mean PSNR value across all image pixels
    # axis=None indicates to compute the mean over all dimensions of the tensor
    # keepdims=False specifies to reduce the dimensions of the output tensor
    # name=None specifies not to use a custom name for the operation
    return tf.math.reduce_mean(psnr, axis=None, keepdims=False, name=None)

In [None]:
def plot_psnr(psnr):
    
    psnr_means = psnr['psnr_quality']
    plt.figure(figsize=(10,8))
    plt.plot(psnr_means)    
    plt.xlabel('Epochs')
    plt.ylabel('PSNR') 
    plt.title('PSNR')

In [None]:
# Define a function that takes two images as input
def compute_ssim(original_image, generated_image):
    
    # Convert the input images to TensorFlow tensors with dtype float32
    original_image = tf.convert_to_tensor(original_image, dtype=tf.float32)
    generated_image = tf.convert_to_tensor(generated_image, dtype=tf.float32)
    
    # Use the TensorFlow image module to compute the structural similarity (SSIM) index between the two images
    # Set the maximum value of the images to 1.0 and specify the filter size, filter sigma, and k1/k2 constants
    ssim = tf.image.ssim(original_image, generated_image, max_val=1.0, filter_size=11,
                          filter_sigma=1.5, k1=0.01, k2=0.03)

    # Compute the mean SSIM value across all image pixels
    # axis=None indicates to compute the mean over all dimensions of the tensor
    # keepdims=False specifies to reduce the dimensions of the output tensor
    # name=None specifies not to use a custom name for the operation
    return tf.math.reduce_mean(ssim, axis=None, keepdims=False, name=None)


In [None]:
def plot_ssim(ssim):
    
    ssim_means = ssim['ssim_quality']

    plt.figure(figsize=(10,8))
    plt.plot(ssim_means)
    plt.xlabel('Epochs')
    plt.ylabel('SSIM')
    plt.title('SSIM')

In [None]:
def plot_loss(losses):

    d_loss = losses['d_history']
    g_loss = losses['g_history']
    
   
    plt.figure(figsize=(10,8))
    plt.plot(d_loss, label="Discriminator loss")
    plt.plot(g_loss, label="Generator loss")
    
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title("Loss")    
    plt.legend()

In [None]:
def sample_images(image_list, batch_size, high_resolution_shape, low_resolution_shape):
    
    """
    Pre-process a batch of training images
    """
    
    # image_list is the list of all images
    # ransom sample a batch of images
    images_batch = np.random.choice(image_list, size=batch_size)
    
    lr_images = []
    hr_images = []
    

    for img in images_batch:
  
        #img1 = imread(img, as_gray=False, pilmode='RGB')
        img1 = imread(img, pilmode='RGB')
        img1 = img1.astype(np.float32)
        
        # change the size     
        img1_high_resolution = imresize(img1, high_resolution_shape)
        img1_low_resolution = imresize(img1, low_resolution_shape)
                

        # do a random horizontal flip
        if np.random.random() < 0.5:
            img1_high_resolution = np.fliplr(img1_high_resolution)
            img1_low_resolution = np.fliplr(img1_low_resolution)
       
        hr_images.append(img1_high_resolution)
        lr_images.append(img1_low_resolution)
        
   
    # convert lists into numpy ndarrays
    return np.array(hr_images), np.array(lr_images)    

In [None]:
def save_images(original_image, lr_image, sr_image, path):
    
    """
    Save LR, HR (original) and generated SR
    images in one panel 
    """
    
    fig, ax = plt.subplots(1,3, figsize=(10, 6))

    images = [original_image, lr_image, sr_image]
    titles = ['HR', 'LR','SR - generated']

    for idx,img in enumerate(images):
        # (X + 1)/2 to scale back from [-1,1] to [0,1]
        ax[idx].imshow((img + 1)/2.0, cmap='gray')
        ax[idx].axis("off")
    for idx, title in enumerate(titles):    
        ax[idx].set_title('{}'.format(title))
        
    plt.savefig(path)    

In [None]:
#def residual_block(x):

 #   filters = [64, 64]
 #   kernel_size = 3
 #   strides = 1
 #   padding = "same"
 #   momentum = 0.8
 #   activation = "relu"

 #   res = Conv2D(filters=filters[0], kernel_size=kernel_size, strides=strides, padding=padding)(x)
 #   res = Activation(activation=activation)(res)
 #   res = BatchNormalization(momentum=momentum)(res)

 #   res = Conv2D(filters=filters[1], kernel_size=kernel_size, strides=strides, padding=padding)(res)
 #   res = BatchNormalization(momentum=momentum)(res)

    # Adjust shape of input x to match shape of res for adding
 #   if x.shape[-1] != res.shape[-1]:
 #       x = Conv2D(filters=filters[-1], kernel_size=1, strides=strides, padding=padding)(x)
 #       x = BatchNormalization(momentum=momentum)(x)

 #   res = Add()([res, x])
 #   res = Activation(activation=activation)(res)
    
 #   return res

The above commented out code was a block I originally ran but get shaping issues that I could not resolve. Got the program running with the below block but due to other issues being resolved. But did not want to run again. Above block may be a more correct implementation of the residual block; that's why it is left in, just commented out.

In [None]:
def residual_block(x):

    filters = [64, 64]
    kernel_size = 3
    strides = 1
    padding = "same"
    momentum = 0.8
    activation = "relu"

    res = Conv2D(filters=filters[0], kernel_size=kernel_size, strides=strides, padding=padding)(x)
    res = Activation(activation=activation)(res)
    res = BatchNormalization(momentum=momentum)(res)

    res = Conv2D(filters=filters[1], kernel_size=kernel_size, strides=strides, padding=padding)(res)
    res = BatchNormalization(momentum=momentum)(res)

    res = Add()([res, x])
    
    return res

In [None]:
def build_generator():
    
    # use 16 residual blocks in generator
    residual_blocks = 16
    momentum = 0.8
    
    # input LR dimension: 4x downsample of HR
    input_shape = (32, 32, 3)
    
    # input for the generator
    input_layer = Input(shape=input_shape)
    
    # pre-residual block: conv layer before residual blocks 
    gen1 = Conv2D(filters=64, kernel_size=9, strides=1, padding='same', activation='relu')(input_layer)
    
    # add 16 residual blocks
    res = residual_block(gen1)
    for i in range(residual_blocks - 1):
        res = residual_block(res)
    
    # post-residual block: conv and batch-norm layer after residual blocks
    gen2 = Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(res)
    gen2 = BatchNormalization(momentum=momentum)(gen2)
    
    # take the sum of pre-residual block(gen1) and post-residual block(gen2)
    gen3 = Add()([gen2, gen1])
    
    # upsampling
    gen4 = UpSampling2D(size=2)(gen3)
    gen4 = Conv2D(filters=256, kernel_size=3, strides=1, padding='same')(gen4)
    gen4 = Activation('relu')(gen4)
    
    # upsampling
    gen5 = UpSampling2D(size=2)(gen4)
    gen5 = Conv2D(filters=256, kernel_size=3, strides=1, padding='same')(gen5)
    gen5 = Activation('relu')(gen5)
    
    # conv layer at the output
    gen6 = Conv2D(filters=3, kernel_size=9, strides=1, padding='same')(gen5)
    output = Activation('tanh')(gen6)
    
    # model 
    model = Model(inputs=[input_layer], outputs=[output], name='generator')

    return model

In [None]:
generator = build_generator()

In [None]:
def build_discriminator():
    
    # define hyperparameters
    leakyrelu_alpha = 0.2
    momentum = 0.8
    
    # the input is the HR shape
    input_shape = (128, 128, 3)
    
    # input layer for discriminator
    input_layer = Input(shape=input_shape)
    
    # 8 convolutional layers with batch normalization  
    dis1 = Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(input_layer)
    dis1 = LeakyReLU(alpha=leakyrelu_alpha)(dis1)

    dis2 = Conv2D(filters=64, kernel_size=3, strides=2, padding='same')(dis1)
    dis2 = LeakyReLU(alpha=leakyrelu_alpha)(dis2)
    dis2 = BatchNormalization(momentum=momentum)(dis2)

    dis3 = Conv2D(filters=128, kernel_size=3, strides=1, padding='same')(dis2)
    dis3 = LeakyReLU(alpha=leakyrelu_alpha)(dis3)
    dis3 = BatchNormalization(momentum=momentum)(dis3)

    dis4 = Conv2D(filters=128, kernel_size=3, strides=2, padding='same')(dis3)
    dis4 = LeakyReLU(alpha=leakyrelu_alpha)(dis4)
    dis4 = BatchNormalization(momentum=0.8)(dis4)

    dis5 = Conv2D(256, kernel_size=3, strides=1, padding='same')(dis4)
    dis5 = LeakyReLU(alpha=leakyrelu_alpha)(dis5)
    dis5 = BatchNormalization(momentum=momentum)(dis5)

    dis6 = Conv2D(filters=256, kernel_size=3, strides=2, padding='same')(dis5)
    dis6 = LeakyReLU(alpha=leakyrelu_alpha)(dis6)
    dis6 = BatchNormalization(momentum=momentum)(dis6)

    dis7 = Conv2D(filters=512, kernel_size=3, strides=1, padding='same')(dis6)
    dis7 = LeakyReLU(alpha=leakyrelu_alpha)(dis7)
    dis7 = BatchNormalization(momentum=momentum)(dis7)

    dis8 = Conv2D(filters=512, kernel_size=3, strides=2, padding='same')(dis7)
    dis8 = LeakyReLU(alpha=leakyrelu_alpha)(dis8)
    dis8 = BatchNormalization(momentum=momentum)(dis8)
    
    # fully connected layer 
    dis9 = Dense(units=1024)(dis8)
    dis9 = LeakyReLU(alpha=0.2)(dis9)
    
    # last fully connected layer - for classification 
    output = Dense(units=1, activation='sigmoid')(dis9)   
    
    model = Model(inputs=[input_layer], outputs=[output], name='discriminator')
    
    return model

In [None]:
discriminator = build_discriminator()
discriminator.trainable = True
discriminator.compile(loss='binary_crossentropy', optimizer=common_optimizer, metrics=['accuracy'])

In [None]:
VGG19_base = VGG19(weights="imagenet")

In [None]:
from tensorflow.keras.applications import VGG19

def build_VGG19():
    input_shape = (128, 128, 3)
    base_model = VGG19(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False
    model = Model(inputs=base_model.input, outputs=base_model.get_layer('block5_conv4').output)
    return model

In [None]:
fe_model = build_VGG19()
fe_model.trainable = False
fe_model.compile(loss='mse', optimizer=common_optimizer, metrics=['accuracy'])

In [None]:
def build_adversarial_model(generator, discriminator, feature_extractor):
    
    # input layer for high-resolution images
    input_high_resolution = Input(shape=high_resolution_shape)

    # input layer for low-resolution images
    input_low_resolution = Input(shape=low_resolution_shape)

    # generate high-resolution images from low-resolution images
    generated_high_resolution_images = generator(input_low_resolution)

    # extract feature maps from generated images
    features = feature_extractor(generated_high_resolution_images)
    
    # make a discriminator non-trainable 
    discriminator.trainable = False
    discriminator.compile(loss='mse', optimizer=common_optimizer, metrics=['accuracy'])

    # discriminator will give us a probability estimation for the generated high-resolution images
    probs = discriminator(generated_high_resolution_images)

    # create and compile 
    adversarial_model = Model([input_low_resolution, input_high_resolution], [probs, features])
    adversarial_model.compile(loss=['binary_crossentropy', 'mse'], loss_weights=[1e-3, 1], optimizer=common_optimizer)
    
    return adversarial_model

In [None]:
adversarial_model = build_adversarial_model(generator, discriminator, fe_model)

In [None]:
# initialize 

losses = {"d_history":[], "g_history":[]}
psnr = {'psnr_quality': []}
ssim = {'ssim_quality': []}

In [None]:
from skimage.transform import resize

def upscale_images(images, size):
    upscaled_images = []
    for img in images:
        upscaled_images.append(resize(img, size))
    return np.array(upscaled_images)

In [None]:
import os
import cv2
import numpy as np
# Set up paths
data_dir = '/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/'
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')
output_dir = '/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/Downsized_32x32x3'

# Define image size
img_size = (32, 32)

# Loop through train and test folders
for folder in [train_dir, test_dir]:

    # Loop through DME and DRUSEN subfolders
    for subfolder in ['DME', 'DRUSEN']:

        # Set up subfolder path
        subfolder_path = os.path.join(folder, subfolder)

        # Loop through images in subfolder
        for img_name in os.listdir(subfolder_path):

            # Read in image and resize to img_size
            img_path = os.path.join(subfolder_path, img_name)
            img = cv2.imread(img_path)
            img_resized = cv2.resize(img, img_size)

            # Save resized image to output directory
            output_path = os.path.join(output_dir, folder.split('/')[-1], subfolder, img_name)
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            cv2.imwrite(output_path, img_resized)

In [None]:
import os
import cv2
import numpy as np
# Set up paths
data_dir = '/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/'
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')
output_dir = '/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/Downsized_128x128x3'

# Define image size
img_size = (128, 128)

# Loop through train and test folders
for folder in [train_dir, test_dir]:

    # Loop through DME and DRUSEN subfolders
    for subfolder in ['DME', 'DRUSEN']:

        # Set up subfolder path
        subfolder_path = os.path.join(folder, subfolder)

        # Loop through images in subfolder
        for img_name in os.listdir(subfolder_path):

            # Read in image and resize to img_size
            img_path = os.path.join(subfolder_path, img_name)
            img = cv2.imread(img_path)
            img_resized = cv2.resize(img, img_size)

            # Save resized image to output directory
            output_path = os.path.join(output_dir, folder.split('/')[-1], subfolder, img_name)
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            cv2.imwrite(output_path, img_resized)

In [None]:
low_resolution_shape = (32, 32, 3)
high_resolution_shape = (128, 128, 3)
batch_size = 8
epochs = 150

d_history = []
g_history = []
psnr = {'psnr_quality': []}
ssim = {'ssim_quality': []}
losses = {'d_history': [], 'g_history': []}

for epoch in range(epochs):

    d_history = []
    g_history = []
    
    image_list = get_train_images(data_path)
    
    """
    Train the discriminator network
    """
    
    hr_images, lr_images = sample_images(image_list, 
                                         batch_size=batch_size,
                                         high_resolution_shape=(128, 128, 3),
                                         low_resolution_shape=(32, 32, 3))
                                         
    
    # generate high-resolution images from low-resolution images
    generated_high_resolution_images = generator.predict(lr_images)
    generated_high_resolution_images = upscale_images(generated_high_resolution_images, size=(128, 128))
    
    # normalize the images
    hr_images = hr_images / 127.5 - 1.
    generated_high_resolution_images = generated_high_resolution_images / 127.5 - 1.
    
    # generate a batch of true and fake labels 
    real_labels = np.ones((batch_size, 8, 8, 1))
    fake_labels = np.zeros((batch_size, 8, 8, 1))
    
    d_loss_real = discriminator.train_on_batch(hr_images, real_labels)
    d_loss_real =  np.mean(d_loss_real)
    d_loss_fake = discriminator.train_on_batch(generated_high_resolution_images, fake_labels)
    d_loss_fake =  np.mean(d_loss_fake)
    
    # calculate total loss of discriminator as average loss on true and fake labels
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    losses['d_history'].append(d_loss)
   

    """
        Train the generator network
    """
      
    # sample a batch of images    
    hr_images, lr_images = sample_images(image_list, 
                                         batch_size=batch_size,
                                         low_resolution_shape=(32, 32, 3),
                                         high_resolution_shape=(128, 128, 3))
    
    # generate high-resolution images from low-resolution images
    generated_high_resolution_images = generator.predict(lr_images)
    generated_high_resolution_images = upscale_images(generated_high_resolution_images, size=(128, 128))
    
    # normalize the images
    hr_images = hr_images / 127.5 - 1.
    lr_images = lr_images / 127.5 - 1.
    generated_high_resolution_images = generated_high_resolution_images / 127.5 - 1.
    
    # extract feature maps for true high-resolution images
    image_features = fe_model.predict(hr_images)

    # train the generator
    g_loss = adversarial_model.train_on_batch([lr_images, hr_images],
                                               [real_labels, image_features])
    
    losses['g_history'].append(0.5 * (g_loss[1]))
    
    # calculate the psnr  
    ps = compute_psnr(hr_images, generated_high_resolution_images) 
    psnr['psnr_quality'].append(ps)
            
    # calculate the ssim 
    ss = compute_ssim(hr_images, generated_high_resolution_images)   
    ssim['ssim_quality'].append(ss)

    
    """
        save and print image samples
    """
    
    image_counter = 0

    # Save and print image samples
    if epoch % 10 == 0:
        image_batch_hr, image_batch_lr = sample_images(image_list, 16, high_resolution_shape, low_resolution_shape)

        # Normalize the images
        image_batch_hr = image_batch_hr / 127.5 - 1.
        image_batch_lr = image_batch_lr / 127.5 - 1.

        generated_images = generator.predict_on_batch(image_batch_lr)

        for index, img in enumerate(generated_images):
        # Increment the image counter
            image_counter += 1

            # Name the generated image using the epoch and index values
            image_name = "generated_image_e{}_{}.png".format(epoch, index)

            # Save the generated image in the "Generated Images" folder
            save_images(image_batch_hr[index], image_batch_lr[index], img, path="/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Data/Generated Images/{}".format(image_name))
            
    """
        save the generator and discriminator .h5 files every 10 epoch
    """
    
    if epoch % 10 == 0:
        # Save the generator model as .h5 file
        generator.save('/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Generator_Epochs/generator_model_epoch_{}.h5'.format(epoch))

        # Save the discriminator model as .h5 file
        discriminator.save('/home/jsmith/Desktop/JohnSmith_AssignmentMidtermProject/Discriminator_Epochs/discriminator_model_epoch_{}.h5'.format(epoch))