## pre-processing

In [None]:
import tensorflow as tf

In [None]:
import tensorflow_datasets as tfds
from tensorflow_examples.models.pix2pix import pix2pix_4xSR as pix2pix

import os
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output
import cv2
AUTOTUNE = tf.data.AUTOTUNE

In [None]:
if tf.test.gpu_device_name():
   print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
else:
   print("Please install GPU version of TF")

In [None]:
BUFFER_SIZE = 100
BATCH_SIZE = 16
IMG_WIDTH = 256
IMG_HEIGHT = 256
ratio=4

In [None]:
def load_lr(image_file):
  image = tf.io.read_file(image_file)
  image = tf.image.decode_png(image)

#   low_image_up=tf.image.resize(image,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)

  input_image = tf.cast(image, tf.float32)  
  input_image=tf.reshape(input_image,[int(IMG_WIDTH/4),int(IMG_HEIGHT/4),3])

  
  return input_image

In [None]:
def load_hr(image_file):
  image = tf.io.read_file(image_file)
  image = tf.image.decode_png(image)

  
  real_image = tf.cast(image, tf.float32)
  real_image=tf.reshape(real_image,[IMG_WIDTH,IMG_HEIGHT,3])
  
  return real_image

In [None]:
inp=load_lr('/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_64_train/pair5&20_50_74.png')
re = load_hr('/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_256_train/pair5&20_50_85.png')

# casting to int for matplotlib to show the image
plt.figure()
plt.imshow(inp[:,:]/255.0,cmap='gray')
plt.figure()
plt.imshow(re[:,:]/255.0,cmap='gray')

In [None]:
def random_crop(image):
  cropped_image = tf.image.random_crop(
      image, size=[IMG_HEIGHT, IMG_WIDTH, 3])

  return cropped_image

In [None]:
# normalizing the images to [-1, 1]
def normalize(image):
  image = tf.cast(image, tf.float32)
  image = (image / 127.5) - 1
  return image

In [None]:
def random_jitter(image):
  # resizing to 286 x 286 x 3
  image = tf.image.resize(image, [286, 286],
                          method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

  # randomly cropping to 256 x 256 x 3
  image = random_crop(image)

  # random mirroring
  image = tf.image.random_flip_left_right(image)

  return image

In [None]:
def preprocess_image_lr(image):
#   image = random_jitter(image)
  image=load_lr(image)
  image = normalize(image)
  return image

In [None]:
def preprocess_image_hr(image):
  image=load_hr(image)
  image = normalize(image)
  return image

## load dataset

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_64_train'
#datadir='/home/hzhuge/Downloads/1072case/'
PATH=os.path.join(datadir)

train_dataset_lr = tf.data.Dataset.list_files(PATH+'/*.png')
train_dataset_lr = train_dataset_lr.map(preprocess_image_lr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset_lr = train_dataset_lr.shuffle(BUFFER_SIZE)
train_dataset_lr = train_dataset_lr.batch(BATCH_SIZE)
train_dataset_lr

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_256_train'
#datadir='/home/hzhuge/Downloads/1072case/'
PATH=os.path.join(datadir)

train_dataset_hr = tf.data.Dataset.list_files(PATH+'/*.png')
train_dataset_hr = train_dataset_hr.map(preprocess_image_hr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset_hr = train_dataset_hr.shuffle(BUFFER_SIZE)
train_dataset_hr = train_dataset_hr.batch(BATCH_SIZE)
train_dataset_hr

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_64_test'
#datadir='/home/hzhuge/Downloads/1072case/'
PATH=os.path.join(datadir)

test_dataset_lr = tf.data.Dataset.list_files(PATH+'/*.png')
test_dataset_lr = test_dataset_lr.map(preprocess_image_lr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset_lr = test_dataset_lr.shuffle(BUFFER_SIZE)
test_dataset_lr = test_dataset_lr.batch(BATCH_SIZE)
test_dataset_lr

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_256_test'
#datadir='/home/hzhuge/Downloads/1072case/'
PATH=os.path.join(datadir)

test_dataset_hr = tf.data.Dataset.list_files(PATH+'/*.png')
test_dataset_hr = test_dataset_hr.map(preprocess_image_hr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset_hr = test_dataset_hr.shuffle(BUFFER_SIZE)
test_dataset_hr = test_dataset_hr.batch(BATCH_SIZE)
test_dataset_hr

In [None]:
sample_lr = next(iter(train_dataset_lr))
sample_hr = next(iter(train_dataset_hr))

In [None]:
plt.subplot(121)
plt.title('lr')
plt.imshow(sample_lr[0] * 0.5 + 0.5)

plt.subplot(122)
plt.title('lr with random jitter')
plt.imshow(random_jitter(sample_lr[0]) * 0.5 + 0.5)

In [None]:
plt.subplot(121)
plt.title('hr')
plt.imshow(sample_hr[0] * 0.5 + 0.5)

plt.subplot(122)
plt.title('hr with random jitter')
plt.imshow(random_jitter(sample_hr[0]) * 0.5 + 0.5)

## SR-CycleGAN model with two generators and two discriminators

In [None]:
OUTPUT_CHANNELS = 3

generator_g = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')
generator_f = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')

discriminator_x = pix2pix.discriminator_lr(norm_type='instancenorm', target=False)
discriminator_y = pix2pix.discriminator_hr(norm_type='instancenorm', target=False)

In [None]:
sample_lr_up=tf.image.resize(sample_lr,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
to_hr = generator_g(sample_lr_up)
to_lr = generator_f(sample_hr)
plt.figure(figsize=(8, 8))
contrast = 8

imgs = [sample_lr, to_hr, sample_hr, to_lr]
title = ['LR', 'To HR', 'HR', 'To LR']

for i in range(len(imgs)):
  plt.subplot(2, 2, i+1)
  plt.title(title[i])
  if i % 2 == 0:
    plt.imshow(imgs[i][0] * 0.5 + 0.5)
  else:
    plt.imshow(imgs[i][0] * 0.5 * contrast + 0.5)
plt.show()

In [None]:
plt.figure(figsize=(8, 8))

plt.subplot(121)
plt.title('Is a real HR?')
plt.imshow(discriminator_y(sample_hr)[0, ..., -1], cmap='RdBu_r')

plt.subplot(122)
plt.title('Is a real LR?')
plt.imshow(discriminator_x(sample_lr)[0, ..., -1], cmap='RdBu_r')

plt.show()

## Loss functions

In [None]:
LAMBDA = 50

In [None]:
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def discriminator_loss(real, generated):
  real_loss = loss_obj(tf.ones_like(real), real)

  generated_loss = loss_obj(tf.zeros_like(generated), generated)

  total_disc_loss = real_loss + generated_loss

  return total_disc_loss * 0.5

In [None]:
def generator_loss(generated):
  return loss_obj(tf.ones_like(generated), generated)

In [None]:
def calc_cycle_loss(real_image, cycled_image):
  loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
  
  return LAMBDA * loss1

In [None]:
def identity_loss(real_image, same_image):
  loss = tf.reduce_mean(tf.abs(real_image - same_image))
  return LAMBDA * 0.5 * loss

In [None]:
generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.2)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.2)

discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.2)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.2)

## Checkpoints

In [None]:
checkpoint_path = '/home/hzhuge/Downloads/SR_2D/checkpoints_SR-Cycgan_kidney'

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print ('Latest checkpoint restored!!')

## Training

In [None]:
EPOCHS = 800

In [None]:
def generate_images(model, test_input):
  prediction = model(test_input)
    
  plt.figure(figsize=(12, 12))

  display_list = [test_input[0], prediction[0]]
  title = ['Input Image', 'Predicted Image']
  
  plt.subplot(1, 2, 1)
  plt.title(title[0])
    # getting the pixel values between [0, 1] to plot it.
  plt.imshow(test_input[0] * 0.5 + 0.5)
  plt.axis('off')
  
  plt.subplot(1, 2, 2)
  plt.title(title[1])
    # getting the pixel values between [0, 1] to plot it.
  plt.imshow(prediction[0] * 0.5 + 0.5)
  plt.axis('off')
    
  plt.show()
#   for i in range(2):
#     plt.subplot(1, 2, i+1)
#     plt.title(title[i])
#     # getting the pixel values between [0, 1] to plot it.
#     plt.imshow(display_list[i] * 0.5 + 0.5)
#     plt.axis('off')
#   plt.show()

Even though the training loop looks complicated, it consists of four basic steps:

* Get the predictions.
* Calculate the loss.
* Calculate the gradients using backpropagation.
* Apply the gradients to the optimizer.

In [None]:
@tf.function
def train_step(real_x, real_y):
  # persistent is set to True because the tape is used more than
  # once to calculate the gradients.
  with tf.GradientTape(persistent=True) as tape:
    # Generator G translates X -> Y
    # Generator F translates Y -> X.
    real_x_up=tf.image.resize(real_x,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
    fake_y = generator_g(real_x_up, training=True)
    cycled_x = generator_f(fake_y, training=True)
    cycled_x_down=tf.image.resize(cycled_x,[64,64],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
    
    fake_x = generator_f(real_y, training=True)
    fake_x_down=tf.image.resize(fake_x,[64,64],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
    cycled_y = generator_g(fake_x, training=True)

    # same_x and same_y are used for identity loss.
    same_x = generator_f(real_x_up, training=True)
    same_x_down=tf.image.resize(same_x,[64,64],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)

    same_y = generator_g(real_y, training=True)

    disc_real_x = discriminator_x(real_x, training=True)
    disc_real_y = discriminator_y(real_y, training=True)

    disc_fake_x = discriminator_x(fake_x_down, training=True)
    disc_fake_y = discriminator_y(fake_y, training=True)

    # calculate the loss
    gen_g_loss = generator_loss(disc_fake_y)
    gen_f_loss = generator_loss(disc_fake_x)
    
    total_cycle_loss = calc_cycle_loss(real_x, cycled_x_down) + calc_cycle_loss(real_y, cycled_y)
    
    # Total generator loss = adversarial loss + cycle loss
    total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(real_y, same_y)
    total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(real_x, same_x_down)

    disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
    disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)
  
  # Calculate the gradients for generator and discriminator
  generator_g_gradients = tape.gradient(total_gen_g_loss, 
                                        generator_g.trainable_variables)
  generator_f_gradients = tape.gradient(total_gen_f_loss, 
                                        generator_f.trainable_variables)
  
  discriminator_x_gradients = tape.gradient(disc_x_loss, 
                                            discriminator_x.trainable_variables)
  discriminator_y_gradients = tape.gradient(disc_y_loss, 
                                            discriminator_y.trainable_variables)
  
  # Apply the gradients to the optimizer
  generator_g_optimizer.apply_gradients(zip(generator_g_gradients, 
                                            generator_g.trainable_variables))

  generator_f_optimizer.apply_gradients(zip(generator_f_gradients, 
                                            generator_f.trainable_variables))
  
  discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients,
                                                discriminator_x.trainable_variables))
  
  discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients,
                                                discriminator_y.trainable_variables))

In [None]:
for epoch in range(1,1000):
  start = time.time()

  n = 0
  for image_x, image_y in tf.data.Dataset.zip((train_dataset_lr, train_dataset_hr)):
    train_step(image_x, image_y)
    if n % 10 == 0:
      print ('.', end='')
    n += 1

  clear_output(wait=True)
  # Using a consistent image (sample_horse) so that the progress of the model
  # is clearly visible.
  sample_lr_up=tf.image.resize(sample_lr,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
  generate_images(generator_g, sample_lr_up)

  if (epoch + 1) % 100 == 0:
    ckpt_save_path = ckpt_manager.save()
    print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,
                                                         ckpt_save_path))

  print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1,
                                                      time.time()-start))

## Generate using test dataset

In [None]:
!ls {checkpoint_path} 
checkpoint_path
ckpt.restore(ckpt_manager.latest_checkpoint)

In [None]:
# Run the trained model on the test dataset
for inp in test_dataset_lr.take(20):
  sample_lr_up=tf.image.resize(inp,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
  generate_images(generator_g, sample_lr_up)

## test on breast

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/breast/patch_64_test'
#datadir='/home/hzhuge/Downloads/1072case/'
PATH=os.path.join(datadir)

test_dataset_lr_name = tf.data.Dataset.list_files(PATH+'/*.png',shuffle=False)
test_dataset_lr = test_dataset_lr_name.map(preprocess_image_lr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset_lr = test_dataset_lr.batch(1)
test_dataset_lr

import re
subname=list()
for filename in test_dataset_lr_name:
  fname=filename.numpy().decode('utf-8')
  res=re.split("[/.]",fname)[-2]
  subname.append(res) 

subname

In [None]:
def mutual_information(hgram):
...     """ Mutual information for joint histogram
...     """
...     # Convert bins counts to probability values
...     pxy = hgram / float(np.sum(hgram))
...     px = np.sum(pxy, axis=1) # marginal for x over y
...     py = np.sum(pxy, axis=0) # marginal for y over x
...     px_py = px[:, None] * py[None, :] # Broadcast to multiply marginals
...     # Now we can do the calculation using the pxy, px_py 2D arrays
...     nzs = pxy > 0 # Only non-zero pxy values contribute to the sum
...     return np.sum(pxy[nzs] * np.log(pxy[nzs] / px_py[nzs]))

In [None]:
def ssim_mse_mi(model, test_input,index):
  # the training=True is intentional here since
  # we want the batch statistics while running the model
  # on the test dataset. If we use training=False, we will get
  # the accumulated statistics learned from the training dataset
  # (which we don't want)
#   filename_cnn=subname[index]+'.png'
#   directory_predict_unet = r'/home/hzhuge/Downloads/CSBDeep-master/examples/denoising2D/phantom_data/predict'  
#   unet=tf.image.decode_image(tf.io.read_file(unet_name))
#   unet = tf.expand_dims(tf.cast(unet[:,:,0], tf.float32),axis=2)


  prediction = model(test_input, training=True)
  print(prediction.shape)
  plt.figure(figsize=(20,20))
  
 
  filename=subname[index]+'.png'
  path_hr='/home/hzhuge/Downloads/SR_2D/cycgan_patch/breast/patch_256'
  PATH=os.path.join(path_hr,filename)
  test_hr = preprocess_image_hr(PATH)


    
  display_list = [test_input[0], test_hr, prediction[0]]
  title = ['Gaussian', 'Ground Truth', 'Predicted Image_CycleGAN']
  inputim_l=np.array(display_list[0]*0.5+0.5)
  ground_l=np.array(display_list[1]*0.5+0.5)
  predict_l=np.array(display_list[2]*0.5+0.5)
  
  
  inputim_g= display_list[0]*0.5+0.5
  ground_g = display_list[1]*0.5+0.5
  predict_g = display_list[2]*0.5+0.5
 
  
# sim vs. wide-field
  mgi=np.square(np.subtract(ground_l, inputim_l)).mean()
  sgi=tf.image.ssim(ground_g,inputim_g, max_val=1.0).numpy()
  hist_2d_gi, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),ground_l.ravel(),bins=20)
  mi_gi=mutual_information(hist_2d_gi)
    
# sim vs. gan
  mgp=np.square(np.subtract(ground_l, predict_l)).mean()
  sgp=tf.image.ssim(ground_g,predict_g, max_val=1.0).numpy()
  hist_2d_gp, x_edges, y_edges = np.histogram2d(predict_l.ravel(),ground_l.ravel(),bins=20)
  mi_gp=mutual_information(hist_2d_gp)


# wide_field vs.gan
  mip=np.square(np.subtract(inputim_l, predict_l)).mean()
  sip=tf.image.ssim(inputim_g,predict_g, max_val=1.0).numpy()
  hist_2d_ip, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),predict_l.ravel(),bins=20)
  mi_ip=mutual_information(hist_2d_ip)


#  df = pd.DataFrame(np.array([mgi,mgp,mip,sgi,sgp,sip]), columns =['mse_ground_input','mse_ground_predict','mse_input_predict','ssim_ground_input',
#           'ssim_ground_predict','ssim_input_predict']) 
  d={
     
     'MSE_HR_CycleGAN':mgp,
     'MSE_HR_Gaussian':mgi,
     'MSE_Gaussian_CycleGAN':mip,
    
     'SSIM_HR_CycleGAN':sgp,
     'SSIM_HR_Gaussian':sgi,
     'SSIM_Gaussian_CycleGAN':sip,
    
     'MI_HR_CycleGAN':mi_gp,
     'MI_HR_Gaussian':mi_gi,
     'MI_Gaussian_CycleGAN':mi_ip,
     }
#   d={'MSE_HR_GAN':mgp,
#      'MSE_LR_GAN':mip,
#      'MSE_LR_HR':mgi,
#      'SSIM_HR_GAN':sgp,
#      'SSIM_LR_GAN':sip,
#      'SSIM_LR_HR':sgi,
#      'MI_HR_GAN':mi_gp,
#      'MI_LR_GAN':mi_ip,
#      'MI_LR_HR':mi_gi}
  ps1=pd.Series(d).to_frame().T
  ps=pd.Series(d)
  

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()
    

  print(ps)
  return ps1

In [None]:
import pandas as pd
import numpy as np
i=0
df=pd.DataFrame()
# Run the trained model on the test dataset
for inp in test_dataset_lr.take(30):
  ps=ssim_mse_mi(generator_g, inp,i)
  df_temp=pd.DataFrame(ps)
  df=df.append(df_temp)
  i=i+1

## test on prostate

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/prostate/patch_64_train'
PATH=os.path.join(datadir)

test_dataset_lr_name = tf.data.Dataset.list_files(PATH+'/*.png',shuffle=False)
test_dataset_lr = test_dataset_lr_name.map(preprocess_image_lr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset_lr = test_dataset_lr.batch(1)
test_dataset_lr

import re
subname=list()
for filename in test_dataset_lr_name:
  fname=filename.numpy().decode('utf-8')
  res=re.split("[/.]",fname)[-2]
  subname.append(res) 

subname

In [None]:
def ssim_mse_mi(model, test_input,index):
  # the training=True is intentional here since
  # we want the batch statistics while running the model
  # on the test dataset. If we use training=False, we will get
  # the accumulated statistics learned from the training dataset
  # (which we don't want)
#   filename_cnn=subname[index]+'.png'
#   directory_predict_unet = r'/home/hzhuge/Downloads/CSBDeep-master/examples/denoising2D/phantom_data/predict'  
#   unet=tf.image.decode_image(tf.io.read_file(unet_name))
#   unet = tf.expand_dims(tf.cast(unet[:,:,0], tf.float32),axis=2)


  prediction = model(test_input, training=True)
  print(prediction.shape)
  plt.figure(figsize=(20,20))
  
 
  filename=subname[index]+'.png'
  path_hr='/home/hzhuge/Downloads/SR_2D/cycgan_patch/prostate/patch_256'
  PATH=os.path.join(path_hr,filename)
  test_hr = preprocess_image_hr(PATH)


    
  display_list = [test_input[0], test_hr, prediction[0]]
  title = ['Gaussian', 'Ground Truth', 'Predicted Image_CycleGAN']
  inputim_l=np.array(display_list[0]*0.5+0.5)
  ground_l=np.array(display_list[1]*0.5+0.5)
  predict_l=np.array(display_list[2]*0.5+0.5)
  
  
  inputim_g= display_list[0]*0.5+0.5
  ground_g = display_list[1]*0.5+0.5
  predict_g = display_list[2]*0.5+0.5
 
  
# sim vs. wide-field
  mgi=np.square(np.subtract(ground_l, inputim_l)).mean()
  sgi=tf.image.ssim(ground_g,inputim_g, max_val=1.0).numpy()
  hist_2d_gi, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),ground_l.ravel(),bins=20)
  mi_gi=mutual_information(hist_2d_gi)
    
# sim vs. gan
  mgp=np.square(np.subtract(ground_l, predict_l)).mean()
  sgp=tf.image.ssim(ground_g,predict_g, max_val=1.0).numpy()
  hist_2d_gp, x_edges, y_edges = np.histogram2d(predict_l.ravel(),ground_l.ravel(),bins=20)
  mi_gp=mutual_information(hist_2d_gp)


# wide_field vs.gan
  mip=np.square(np.subtract(inputim_l, predict_l)).mean()
  sip=tf.image.ssim(inputim_g,predict_g, max_val=1.0).numpy()
  hist_2d_ip, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),predict_l.ravel(),bins=20)
  mi_ip=mutual_information(hist_2d_ip)


#  df = pd.DataFrame(np.array([mgi,mgp,mip,sgi,sgp,sip]), columns =['mse_ground_input','mse_ground_predict','mse_input_predict','ssim_ground_input',
#           'ssim_ground_predict','ssim_input_predict']) 
  d={
     
     'MSE_HR_CycleGAN':mgp,
     'MSE_HR_Gaussian':mgi,
     'MSE_Gaussian_CycleGAN':mip,
    
     'SSIM_HR_CycleGAN':sgp,
     'SSIM_HR_Gaussian':sgi,
     'SSIM_Gaussian_CycleGAN':sip,
    
     'MI_HR_CycleGAN':mi_gp,
     'MI_HR_Gaussian':mi_gi,
     'MI_Gaussian_CycleGAN':mi_ip,
     }
#   d={'MSE_HR_GAN':mgp,
#      'MSE_LR_GAN':mip,
#      'MSE_LR_HR':mgi,
#      'SSIM_HR_GAN':sgp,
#      'SSIM_LR_GAN':sip,
#      'SSIM_LR_HR':sgi,
#      'MI_HR_GAN':mi_gp,
#      'MI_LR_GAN':mi_ip,
#      'MI_LR_HR':mi_gi}
  ps1=pd.Series(d).to_frame().T
  ps=pd.Series(d)
  

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()
    

  print(ps)
  return ps1

In [None]:
import pandas as pd
import numpy as np
i=0
df=pd.DataFrame()
# Run the trained model on the test dataset
for inp in test_dataset_lr.take(30):
  ps=ssim_mse_mi(generator_g, inp,i)
  df_temp=pd.DataFrame(ps)
  df=df.append(df_temp)
  i=i+1

## test on kidney

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_64_test'
#datadir='/home/hzhuge/Downloads/1072case/'
PATH=os.path.join(datadir)

test_dataset_lr_name = tf.data.Dataset.list_files(PATH+'/*.png',shuffle=False)
test_dataset_lr = test_dataset_lr_name.map(preprocess_image_lr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset_lr = test_dataset_lr.batch(1)
test_dataset_lr

import re
subname=list()
for filename in test_dataset_lr_name:
  fname=filename.numpy().decode('utf-8')
  res=re.split("[/.]",fname)[-2]
  subname.append(res) 

subname

In [None]:
len(subname)

In [None]:
def load_hr_gan(image_file):
  image = tf.io.read_file(image_file)
  image = tf.image.decode_png(image)[:,:,:3]

  
  real_image = tf.cast(image, tf.float32)
  real_image=tf.reshape(real_image,[IMG_WIDTH,IMG_HEIGHT,3])
  
  return real_image

In [None]:
def preprocess_image_hr_gan(image):
  image=load_hr_gan(image)
  image = normalize(image)
  return image

In [None]:
path_gan='/home/hzhuge/Downloads/SR_2D/patch_result/gan_result'
filename=subname[0]+'.png'
PATH=os.path.join(path_gan,filename)
test_gan = preprocess_image_hr_gan(PATH) 

In [None]:
def ssim_mse_mi_kidney(model, test_input,index):
  # the training=True is intentional here since
  # we want the batch statistics while running the model
  # on the test dataset. If we use training=False, we will get
  # the accumulated statistics learned from the training dataset
  # (which we don't want)
#   filename_cnn=subname[index]+'.png'
#   directory_predict_unet = r'/home/hzhuge/Downloads/CSBDeep-master/examples/denoising2D/phantom_data/predict'  
#   unet=tf.image.decode_image(tf.io.read_file(unet_name))
#   unet = tf.expand_dims(tf.cast(unet[:,:,0], tf.float32),axis=2)


  prediction = model(test_input, training=True)
  print(prediction.shape)
  plt.figure(figsize=(20,20))
  
 
  filename='PRE_'+subname[index]+'.png'
  path_hr='/home/hzhuge/Downloads/SR_2D/cycgan_patch/kidney/patch_256'
  PATH=os.path.join(path_hr,filename)
  test_hr = preprocess_image_hr(PATH)
  
  filename=subname[index]+'.png'
  path_gan='/home/hzhuge/Downloads/SR_2D/patch_result/gan_result'
  PATH=os.path.join(path_gan,filename)
  test_gan = preprocess_image_hr_gan(PATH) 
    
  display_list = [test_input[0], test_hr, prediction[0],test_gan]
  title = ['Gaussian', 'Ground Truth', 'Predicted Image_CycleGAN','Predicted Image_GAN']
  inputim_l=np.array(display_list[0]*0.5+0.5)
  ground_l=np.array(display_list[1]*0.5+0.5)
  predict_l=np.array(display_list[2]*0.5+0.5)
  cnn_l=np.array(display_list[3]*0.5+0.5)
  
  inputim_g= display_list[0]*0.5+0.5
  ground_g = display_list[1]*0.5+0.5
  predict_g = display_list[2]*0.5+0.5
  cnn_g = display_list[3]*0.5+0.5
  
# sim vs. wide-field
  mgi=np.square(np.subtract(ground_l, inputim_l)).mean()
  sgi=tf.image.ssim(ground_g,inputim_g, max_val=1.0).numpy()
  hist_2d_gi, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),ground_l.ravel(),bins=20)
  mi_gi=mutual_information(hist_2d_gi)
    
# sim vs. gan
  mgp=np.square(np.subtract(ground_l, predict_l)).mean()
  sgp=tf.image.ssim(ground_g,predict_g, max_val=1.0).numpy()
  hist_2d_gp, x_edges, y_edges = np.histogram2d(predict_l.ravel(),ground_l.ravel(),bins=20)
  mi_gp=mutual_information(hist_2d_gp)

# sim vs. cnn
  mgc=np.square(np.subtract(ground_l, cnn_l)).mean()
  sgc=tf.image.ssim(ground_g,cnn_g, max_val=1.0).numpy()
  hist_2d_gc, x_edges, y_edges = np.histogram2d(cnn_l.ravel(),ground_l.ravel(),bins=20)
  mi_gc=mutual_information(hist_2d_gc)

# wide_field vs.gan
  mip=np.square(np.subtract(inputim_l, predict_l)).mean()
  sip=tf.image.ssim(inputim_g,predict_g, max_val=1.0).numpy()
  hist_2d_ip, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),predict_l.ravel(),bins=20)
  mi_ip=mutual_information(hist_2d_ip)

# wide_field vs.cnn
  mic=np.square(np.subtract(inputim_l, cnn_l)).mean()
  sic=tf.image.ssim(inputim_g,cnn_g, max_val=1.0).numpy()
  hist_2d_ic, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),cnn_l.ravel(),bins=20)
  mi_ic=mutual_information(hist_2d_ic)

# gan vs.cnn
  mpc=np.square(np.subtract(predict_l, cnn_l)).mean()
  spc=tf.image.ssim(predict_g,cnn_g, max_val=1.0).numpy()
  hist_2d_pc, x_edges, y_edges = np.histogram2d(predict_l.ravel(),cnn_l.ravel(),bins=20)
  mi_pc=mutual_information(hist_2d_pc)

#  df = pd.DataFrame(np.array([mgi,mgp,mip,sgi,sgp,sip]), columns =['mse_ground_input','mse_ground_predict','mse_input_predict','ssim_ground_input',
#           'ssim_ground_predict','ssim_input_predict']) 
  d={
     
     'MSE_HR_CycleGAN':mgp,
     'MSE_HR_GAN':mgc, 
     'MSE_HR_Gaussian':mgi,
     'MSE_GAN_CycleGAN':mpc,
     'MSE_Gaussian_CycleGAN':mip,
     'MSE_Gaussian_GAN':mic,
    
     'SSIM_HR_CycleGAN':sgp,
     'SSIM_HR_GAN':sgc, 
     'SSIM_HR_Gaussian':sgi,
     'SSIM_GAN_CycleGAN':spc,
     'SSIM_Gaussian_CycleGAN':sip,
     'SSIM_Gaussian_GAN':sic,
    
     'MI_HR_CycleGAN':mi_gp,
     'MI_HR_GAN':mi_gc,
     'MI_HR_Gaussian':mi_gi,
     'MI_GAN_CycleGAN':mi_pc,
     'MI_Gaussian_CycleGAN':mi_ip,
     'MI_Gaussian_GAN':mi_ic
     }
#   d={'MSE_HR_GAN':mgp,
#      'MSE_LR_GAN':mip,
#      'MSE_LR_HR':mgi,
#      'SSIM_HR_GAN':sgp,
#      'SSIM_LR_GAN':sip,
#      'SSIM_LR_HR':sgi,
#      'MI_HR_GAN':mi_gp,
#      'MI_LR_GAN':mi_ip,
#      'MI_LR_HR':mi_gi}
  ps1=pd.Series(d).to_frame().T
  ps=pd.Series(d)
  

  for i in range(4):
    plt.subplot(1, 4, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()
    

  print(ps)
  return ps1

In [None]:
import pandas as pd
import numpy as np
i=0
df=pd.DataFrame()
# Run the trained model on the test dataset
for inp in test_dataset_lr.take(30):
  ps=ssim_mse_mi_kidney(generator_g, inp,i)
  df_temp=pd.DataFrame(ps)
  df=df.append(df_temp)
  i=i+1

In [None]:
def save_images(model, test_input,index):
  prediction = model(test_input)
  temp=prediction[0]*0.5+0.5
  filename=subname[index]+'.png'
  path_save='/home/hzhuge/Downloads/SR_2D/patch_result/SR-CycleGAN-kidney'
  PATH=os.path.join(path_save,filename)
  plt.imsave(PATH,temp.numpy())

In [None]:
i=0
for inp in test_dataset_lr:
  sample_lr_up=tf.image.resize(inp,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
  save_images(generator_g, sample_lr_up,i)
  i=i+1

## BIlinear interpolation

In [None]:
def save_images_bilinear(test_input,index):
  sample_lr_up=tf.image.resize(test_input[0],[256,256],antialias=True,method=tf.image.ResizeMethod.BILINEAR)
  temp=sample_lr_up*0.5+0.5
  filename=subname[index]+'.png'
  path_save='/home/hzhuge/Downloads/SR_2D/patch_result/bilinear-kidney'
  PATH=os.path.join(path_save,filename)
  plt.imsave(PATH,temp.numpy())

In [None]:
i=0
for inp in test_dataset_lr:
  save_images_bilinear(inp,i)
  i=i+1

## test on liver

In [None]:
datadir='/home/hzhuge/Downloads/SR_2D/cycgan_patch/liver/patch_64'
PATH=os.path.join(datadir)

test_dataset_lr_name = tf.data.Dataset.list_files(PATH+'/*.png',shuffle=False)
test_dataset_lr = test_dataset_lr_name.map(preprocess_image_lr,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset_lr = test_dataset_lr.batch(1)
test_dataset_lr

import re
subname=list()
for filename in test_dataset_lr_name:
  fname=filename.numpy().decode('utf-8')
  res=re.split("[/.]",fname)[-2]
  subname.append(res) 

subname

In [None]:
def ssim_mse_mi(model, test_input,index):
  # the training=True is intentional here since
  # we want the batch statistics while running the model
  # on the test dataset. If we use training=False, we will get
  # the accumulated statistics learned from the training dataset
  # (which we don't want)
#   filename_cnn=subname[index]+'.png'
#   directory_predict_unet = r'/home/hzhuge/Downloads/CSBDeep-master/examples/denoising2D/phantom_data/predict'  
#   unet=tf.image.decode_image(tf.io.read_file(unet_name))
#   unet = tf.expand_dims(tf.cast(unet[:,:,0], tf.float32),axis=2)


  prediction = model(test_input, training=True)
  print(prediction.shape)
  plt.figure(figsize=(20,20))
  
 
  filename=subname[index]+'.png'
  path_hr='/home/hzhuge/Downloads/SR_2D/cycgan_patch/liver/patch_256'
  PATH=os.path.join(path_hr,filename)
  test_hr = preprocess_image_hr(PATH)


    
  display_list = [test_input[0], test_hr, prediction[0]]
  title = ['Gaussian', 'Ground Truth', 'Predicted Image_CycleGAN']
  inputim_l=np.array(display_list[0]*0.5+0.5)
  ground_l=np.array(display_list[1]*0.5+0.5)
  predict_l=np.array(display_list[2]*0.5+0.5)
  
  
  inputim_g= display_list[0]*0.5+0.5
  ground_g = display_list[1]*0.5+0.5
  predict_g = display_list[2]*0.5+0.5
 
  
# sim vs. wide-field
  mgi=np.square(np.subtract(ground_l, inputim_l)).mean()
  sgi=tf.image.ssim(ground_g,inputim_g, max_val=1.0).numpy()
  hist_2d_gi, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),ground_l.ravel(),bins=20)
  mi_gi=mutual_information(hist_2d_gi)
    
# sim vs. gan
  mgp=np.square(np.subtract(ground_l, predict_l)).mean()
  sgp=tf.image.ssim(ground_g,predict_g, max_val=1.0).numpy()
  hist_2d_gp, x_edges, y_edges = np.histogram2d(predict_l.ravel(),ground_l.ravel(),bins=20)
  mi_gp=mutual_information(hist_2d_gp)


# wide_field vs.gan
  mip=np.square(np.subtract(inputim_l, predict_l)).mean()
  sip=tf.image.ssim(inputim_g,predict_g, max_val=1.0).numpy()
  hist_2d_ip, x_edges, y_edges = np.histogram2d(inputim_l.ravel(),predict_l.ravel(),bins=20)
  mi_ip=mutual_information(hist_2d_ip)


#  df = pd.DataFrame(np.array([mgi,mgp,mip,sgi,sgp,sip]), columns =['mse_ground_input','mse_ground_predict','mse_input_predict','ssim_ground_input',
#           'ssim_ground_predict','ssim_input_predict']) 
  d={
     
     'MSE_HR_CycleGAN':mgp,
     'MSE_HR_Gaussian':mgi,
     'MSE_Gaussian_CycleGAN':mip,
    
     'SSIM_HR_CycleGAN':sgp,
     'SSIM_HR_Gaussian':sgi,
     'SSIM_Gaussian_CycleGAN':sip,
    
     'MI_HR_CycleGAN':mi_gp,
     'MI_HR_Gaussian':mi_gi,
     'MI_Gaussian_CycleGAN':mi_ip,
     }
#   d={'MSE_HR_GAN':mgp,
#      'MSE_LR_GAN':mip,
#      'MSE_LR_HR':mgi,
#      'SSIM_HR_GAN':sgp,
#      'SSIM_LR_GAN':sip,
#      'SSIM_LR_HR':sgi,
#      'MI_HR_GAN':mi_gp,
#      'MI_LR_GAN':mi_ip,
#      'MI_LR_HR':mi_gi}
  ps1=pd.Series(d).to_frame().T
  ps=pd.Series(d)
  

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()
    

  print(ps)
  return ps1

In [None]:
import pandas as pd
import numpy as np
i=0
df=pd.DataFrame()
# Run the trained model on the test dataset
for inp in test_dataset_lr.take(50):
  sample_lr_up=tf.image.resize(inp,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
  ps=ssim_mse_mi(generator_g, sample_lr_up,i)
  df_temp=pd.DataFrame(ps)
  df=df.append(df_temp)
  i=i+1

In [None]:
def save_images(model, test_input,index):
  prediction = model(test_input)
  temp=prediction[0]*0.5+0.5
  filename=subname[index]+'.png'
  path_save='/home/hzhuge/Downloads/SR_2D/patch_result/liver/SR-CycleGAN-k'
  PATH=os.path.join(path_save,filename)
  plt.imsave(PATH,temp.numpy())

In [None]:
i=0
for inp in test_dataset_lr:
  sample_lr_up=tf.image.resize(inp,[256,256],antialias=True,method=tf.image.ResizeMethod.GAUSSIAN)
  save_images(generator_g, sample_lr_up,i)
  i=i+1