# Pix2Pix Introduction :    
   Pix2Pix is a generative adversarial network (GAN) architecture designed for image-to-image translation tasks. The Pix2Pix model consists of a generator and a discriminator that work adversarially to generate realistic images from one domain to another. The model is trained on paired datasets, where each input image from the source domain has a corresponding target image in the desired target domain.

   This Pix2Pix network is focussed on translating images from the Infrared(IR) Domain to Passive Microwave Rainfall(PMR) domain, the Pix2Pix model would be trained on pairs of Infrared images and their corresponding ground truth Passive Microwave Rainfall Images. The goal is to teach the model to generate accurate and realistic PMR images from Infrared inputs.

Here's a simplified outline of the process:

**Data Preparation:**

Gathering a dataset of paired images where each Infrared image has a corresponding ground truth Passive Microwave Rainfall Image.

**Generator:**

The generator takes Infrared images as input and attempts to generate images that resemble the target domain (Passive Microwave Rainfall Images).
It typically consists of encoder-decoder blocks to capture and transform features from one domain to another.

**Discriminator:**

The discriminator evaluates the realism of the generated images by comparing them to real images from the target domain.
It aims to distinguish between real and generated images.

**Adversarial Training:**

The generator and discriminator are trained adversarially. The generator aims to fool the discriminator into thinking its generated images are real.
The discriminator is trained to correctly distinguish between real and generated images.

**Loss Functions:**

The generator is optimized using adversarial loss, which encourages generating realistic images.
Additional perceptual loss functions, like L1 or L2 loss, may be used to ensure pixel-wise similarity between generated and target images.

**Evaluation:**

The model is evaluated on a separate validation set to ensure its ability to generalize to new, unseen data.
Common evaluation metrics include PSNR, SSIM, and domain-specific metrics.






# Data Preparation - Preprocessing the images :
    The dataset used has paired images, so the image has to be split into the corresponding halves pertaining to each domain (Infrared and Passive Micorwave Rainfall).
    The split images are to be resized and stored separately
    The processed images are then stored into a npz file for later usage

In [1]:
import cv2
import os
import numpy as np
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed
# Define the path to your directory
directory = '/kaggle/input/tcirrp-dataset/TCIRRP/train0.01k'

# Get a list of all the file paths
image_paths = [os.path.join(directory, img) for img in os.listdir(directory)]

# Load the images in grayscale
images = np.array([cv2.imread(img_path) for img_path in image_paths])

pmw=[]
ir=[]
# cv2.imshow('image',images[0])
for i in range(len(images)):
    image1 = images[i][0:201, 0:201]
    image2 = images[i][0:201,201:402]
    print(image1.shape)
    image1=cv2.resize(image1,(256,256))
    image2=cv2.resize(image2,(256,256))
    print(image1.shape)
    image1=image1.reshape(256,256,3)
    image2=image2.reshape(256,256,3)
    pmw.append(image1)
    ir.append(image2)
pmw=np.array(pmw)
ir=np.array(ir)

# Now `images` is a list of numpy arrays representing your images in grayscale
print("Loaded",pmw.shape,ir.shape)
# # save as compressed numpy array
filename = 'pmwtoir1_256.npz'
savez_compressed(filename,ir,pmw)
print('Saved dataset: ', filename)

2024-03-05 08:07:14.001725: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-03-05 08:07:14.001846: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-03-05 08:07:14.160835: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
(201, 201, 3)
(256, 256, 3)
Loaded (10, 256, 256, 3) (10, 256, 256, 3)
Saved dataset:  pmwtoir1_256.npz


# Defining the architecture 

In [2]:
from numpy import load
from numpy import zeros
from numpy import ones
from numpy.random import randint
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from matplotlib import pyplot
 
# define the discriminator model
def define_discriminator(image_shape):
 # weight initialization
 init = RandomNormal(stddev=0.02)
 # source image input
 in_src_image = Input(shape=image_shape)
 # target image input
 in_target_image = Input(shape=image_shape)
 # concatenate images channel-wise
 merged = Concatenate()([in_src_image, in_target_image])
 # C64
 d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
 d = LeakyReLU(alpha=0.2)(d)
 # C128
 d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
 d = BatchNormalization()(d)
 d = LeakyReLU(alpha=0.2)(d)
 # C256
 d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
 d = BatchNormalization()(d)
 d = LeakyReLU(alpha=0.2)(d)
 # C512
 d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
 d = BatchNormalization()(d)
 d = LeakyReLU(alpha=0.2)(d)
 # second last output layer
 d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
 d = BatchNormalization()(d)
 d = LeakyReLU(alpha=0.2)(d)
 # patch output
 d = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
 patch_out = Activation('sigmoid')(d)
 # define model
 model = Model([in_src_image, in_target_image], patch_out)
 # compile model
 opt = Adam(learning_rate=0.0002, beta_1=0.5)
 model.outputs  
 model.summary()
 model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights = [0.5])          #was this the source of error ?
 return model
 
# define an encoder block
def define_encoder_block(layer_in, n_filters, batchnorm=True):
 # weight initialization
 init = RandomNormal(stddev=0.02)
 # add downsampling layer
 g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
 # conditionally add batch normalization
 if batchnorm:
   g = BatchNormalization()(g, training=True)
 # leaky relu activation
 g = LeakyReLU(alpha=0.2)(g)
 return g
 
# define a decoder block
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
 # weight initialization
 init = RandomNormal(stddev=0.02)
 # add upsampling layer
 g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
 # add batch normalization
 g = BatchNormalization()(g, training=True)
 # conditionally add dropout
 if dropout:
   g = Dropout(0.5)(g, training=True)
 # merge with skip connection
 g = Concatenate()([g, skip_in])
 # relu activation
 g = Activation('relu')(g)
 return g
 
# define the standalone generator model
def define_generator(image_shape=(256,256,3)):
 # weight initialization
 init = RandomNormal(stddev=0.02)
 # image input
 in_image = Input(shape=image_shape)
 # encoder model
 e1 = define_encoder_block(in_image, 64, batchnorm=False)
 e2 = define_encoder_block(e1, 128)
 e3 = define_encoder_block(e2, 256)
 e4 = define_encoder_block(e3, 512)
 e5 = define_encoder_block(e4, 512)
 e6 = define_encoder_block(e5, 512)
 e7 = define_encoder_block(e6, 512)
 # bottleneck, no batch norm and relu
 b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e7)
 b = Activation('relu')(b)
 # decoder model
 d1 = decoder_block(b, e7, 512)
 d2 = decoder_block(d1, e6, 512)
 d3 = decoder_block(d2, e5, 512)
 d4 = decoder_block(d3, e4, 512, dropout=False)
 d5 = decoder_block(d4, e3, 256, dropout=False)
 d6 = decoder_block(d5, e2, 128, dropout=False)
 d7 = decoder_block(d6, e1, 64, dropout=False)
 # output
 g = Conv2DTranspose(3, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d7)
 out_image = Activation('tanh')(g)
 # define model
 model = Model(in_image, out_image)
 return model
 

# Defining the composite model

In [3]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model, image_shape):
 # make weights in the discriminator not trainable
 for layer in d_model.layers:
   if not isinstance(layer, BatchNormalization):
      layer.trainable = False
 # define the source image
 in_src = Input(shape=image_shape)
 # connect the source image to the generator input
 gen_out = g_model(in_src)
 # connect the source input and generator output to the discriminator input
 dis_out = d_model([in_src, gen_out])
 # src image as input, generated image and classification output
 model = Model(in_src, [dis_out, gen_out])
 # compile model
 opt = Adam(learning_rate=0.0002, beta_1=0.5)
 model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1.0,100.0])
 return model
 
    

# Defining helper functions for easy image access

In [4]:
# load and prepare training images
def load_real_samples(filename):
 # load compressed arrays
 data = load(filename)
 # unpack arrays
 X1, X2 = data['arr_0'], data['arr_1']
 # scale from [0,255] to [-1,1]
 X1 = (X1 - 127.5) / 127.5
 X2 = (X2 - 127.5) / 127.5
 return [X1, X2]
 
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
 # unpack dataset
 trainA, trainB = dataset
 # choose random instances
 ix = randint(0, trainA.shape[0], n_samples)
 # retrieve selected images
 X1, X2 = trainA[ix], trainB[ix]
 # generate 'real' class labels (1)
 y = ones((n_samples, patch_shape, patch_shape, 1))
 return [X1, X2], y
 
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, samples, patch_shape):
 # generate fake instance
 X = g_model.predict(samples)
 # create 'fake' class labels (0)
 y = zeros((len(X), patch_shape, patch_shape, 1))
 return X, y
 


# Summarizing the performance

In [5]:
# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, dataset, n_samples=3):
 # select a sample of input images
 [X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)
 # generate a batch of fake samples
 X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
 # scale all pixels from [-1,1] to [0,1]
 X_realA = (X_realA + 1) / 2.0
 X_realB = (X_realB + 1) / 2.0
 X_fakeB = (X_fakeB + 1) / 2.0
 # plot real source images
 for i in range(n_samples):
   pyplot.subplot(3, n_samples, 1 + i)
   pyplot.axis('off')
   pyplot.imshow(X_realA[i])
 # plot generated target image
 for i in range(n_samples):
   pyplot.subplot(3, n_samples, 1 + n_samples + i)
   pyplot.axis('off')
   pyplot.imshow(X_fakeB[i])
 # plot real target image
 for i in range(n_samples):
   pyplot.subplot(3, n_samples, 1 + n_samples*2 + i)
   pyplot.axis('off')
   pyplot.imshow(X_realB[i])
 # save plot to file
 filename1 = 'plot_%06d.png' % (step+1)
 pyplot.savefig(filename1)
 pyplot.close()
 # save the generator model
 filename2 = 'model_%06d.h5' % (step+1)
 g_model.save(filename2)
 print('>Saved: %s and %s' % (filename1, filename2))

def save_models(step, g_model_AtoB):
	# save the first generator model
	filename1 = 'g_model_AtoB_%06d.h5' % (step+1)
	g_model_AtoB.save(filename1)
	# save the second generator model
	print('>Saved:%s' % (filename1))
 


# Training process

In [6]:
 
# train pix2pix models
def train(d_model, g_model, gan_model, dataset, n_epochs=10, n_batch=1):
 # determine the output square shape of the discriminator
 n_patch = d_model.output_shape[1]
 # unpack dataset
 trainA, trainB = dataset
 # calculate the number of batches per training epoch
 bat_per_epo = int(len(trainA) / n_batch)
 # calculate the number of training iterations
 n_steps = bat_per_epo * n_epochs
 # manually enumerate epochs
 for i in range(n_steps):
 # select a batch of real samples
    [X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
 # generate a batch of fake samples
    X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
 # update discriminator for real samples
    d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
 # update discriminator for generated samples
    d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
 # update the generator
    g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
 # summarize performance
    print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))
 # summarize model performance
    if (i+1) % (bat_per_epo * 10) == 0:
        summarize_performance(i, g_model, dataset)
    save_models(i, g_model)
 
# load image data
dataset = load_real_samples('pmwtoir1_256.npz')
print('Loaded', dataset[0].shape, dataset[1].shape)
# define input shape based on the loaded dataset
image_shape = dataset[0].shape[1:]
# define the models
d_model = define_discriminator(image_shape)
g_model = define_generator(image_shape)
# define the composite model
gan_model = define_gan(g_model, d_model, image_shape)
# train model
train(d_model, g_model, gan_model, dataset)

Loaded (10, 256, 256, 3) (10, 256, 256, 3)




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 743ms/step


ValueError: When there is only a single output, the `loss_weights` argument must be a Python float. Received instead: loss_weights=[0.5] of type <class 'list'>

# **Defining the metrics**

Key metrics used to ensure that the generated images are similar to the ground truth images are:

- **RMSE (Root Mean Squared Error):**
  - Measures the average magnitude of pixel-wise differences between the generated and target images.
  - Lower RMSE values indicate better pixel-level similarity.
  - Sensitive to outliers and can be influenced by extreme values.

- **PSNR (Peak Signal-to-Noise Ratio):**
  - Quantifies the quality of the generated image by comparing it to the target image.
  - Higher PSNR values indicate better image quality.
  - It is a logarithmic scale, and a higher PSNR is associated with lower perceptual differences.

- **CC (Pearson Correlation Coefficient):**
  - Assesses the linear relationship between pixel intensities of the generated and target images.
  - A value close to 1 indicates a strong positive correlation, implying high similarity.
  - Not sensitive to intensity shifts but assumes a linear relationship.

- **SSIM (Structural Similarity Index):**
  - Evaluates the structural information and textures in the images.
  - Takes into account luminance, contrast, and structure.
  - SSIM values range from -1 to 1, where 1 indicates perfect similarity.
  - Multiscale and considers local features, making it suitable for assessing perceptual quality.


In [None]:
## Performance metrics 
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
from scipy.stats import pearsonr
import numpy as np
import pandas as pd

def calculate_rmse(target, prediction):
    return np.sqrt(((prediction - target) ** 2).mean())

def calculate_psnr(target, prediction):
    return psnr(target, prediction, data_range=prediction.max() - prediction.min())

def calculate_cc(target, prediction):
    return pearsonr(target.flatten(), prediction.flatten())[0]

def calculate_ssim(target, prediction):
    # Use a smaller window size for SSIM calculation
    return ssim(target, prediction, win_size=3, multichannel=True, data_range=prediction.max() - prediction.min())

def calculate_metrics(targets,predictions,n_images):
    rmse_values = np.zeros(n_images)
    psnr_values = np.zeros(n_images)
    cc_values = np.zeros(n_images)
    ssim_values = np.zeros(n_images)

    for i in range(n_images):
        target = targets[i]
        prediction = predictions[i]

        rmse_values[i] = calculate_rmse(target, prediction)
        psnr_values[i] = calculate_psnr(target, prediction)
        cc_values[i] = calculate_cc(target, prediction)
        ssim_values[i] = calculate_ssim(target, prediction)

    # Create a DataFrame to display the results
    df = pd.DataFrame({
        'Image': [f'Image {i+1}' for i in range(n_images)],
        'RMSE': rmse_values,
        'PSNR': psnr_values,
        'CC': cc_values,
        'SSIM': ssim_values
    })

    # Add a row for average values
    #df.loc['Average'] = df.mean()
    return df

### Loading the test data from already saved file

In [None]:
from numpy import load

data = load("/kaggle/input/100-images-metrics/pmwtoir1_256_metrics_100.npz")
# unpack arrays
X1, X2 = data['arr_0'], data['arr_1']
# scale from [0,255] to [-1,1]
real = (X1 - 127.5) / 127.5
fake = (X2 - 127.5) / 127.5
print(real.shape,'   ',real.shape)

### Individual image metrics

In [None]:
result_df = calculate_metrics(real,fake,100)
result_df.drop('Image',axis=1,inplace = True)
metrics_100_df = result_df
metrics_100_df

### Metrics summary

In [None]:
metrics_100_df