In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from tensorflow.keras.layers import BatchNormalization, LeakyReLU, ReLU, Conv2D, Conv2DTranspose
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import numpy as np
import os
from matplotlib.image import imread
import matplotlib.pyplot as plt 

In [2]:
x_train_tmp_folder_path = r"C:\Users\shrir\OneDrive\Desktop\GAN\GAN_PS\DATASET\OLD_20\disp_20"
x_train_dsp_folder_path = r"C:\Users\shrir\OneDrive\Desktop\GAN\GAN_PS\DATASET\OLD_20\temp_20"
y_train_folder_path = r"C:\Users\shrir\OneDrive\Desktop\GAN\GAN_PS\DATASET\OLD_20\outputs"

x_tmp_elements = os.listdir(x_train_tmp_folder_path)
x_dsp_elements = os.listdir(x_train_dsp_folder_path)
y_elements = os.listdir(y_train_folder_path)


In [3]:
# DEVELOPING X_train MATRIX

def get_images(tmp_elements,disp_elements,y_elements,size):
  X= np.zeros((size,256,256,2))
  Y= np.zeros((size,256,256,1))
  for index, (tmp_element, dsp_element) in enumerate(zip(tmp_elements, disp_elements)):
      # Load images for each channel
      tmp_element_path = os.path.join(x_train_tmp_folder_path, tmp_element)
      dsp_element_path = os.path.join(x_train_dsp_folder_path, dsp_element)
      img_tmp = imread(tmp_element_path)
      img_dsp = imread(dsp_element_path)

      # Reshape images for each channel
      img_tmp = img_tmp.reshape((256,256, 1))
      img_dsp = img_dsp.reshape((256,256, 1))

      # Combine channels
      img_combined = (np.concatenate((img_tmp, img_dsp), axis=2)-0.5)/0.5

      # Assign to X_train
      X[index] = img_combined

  # DEVELOPING Y_train MATRIX
  for index,Y_train_element in enumerate(y_elements):
      element_path = os.path.join(y_train_folder_path, Y_train_element)
      img = imread(element_path)
      img = np.mean(img, axis=2)
      img = img/255
      img = img.reshape((256, 256, 1))
      Y[index] = img
  return X,Y

In [4]:
x,y = get_images(x_tmp_elements,x_dsp_elements,y_elements,len(x_tmp_elements))

In [5]:
train_data=tf.data.Dataset.from_tensor_slices((x,y))

In [6]:
train_data

<TensorSliceDataset element_spec=(TensorSpec(shape=(256, 256, 2), dtype=tf.float64, name=None), TensorSpec(shape=(256, 256, 1), dtype=tf.float64, name=None))>

In [7]:

temp = [342.4, 343.8, 342.9, 341, 340.6, 345.4, 340.5, 340.3, 337.8, 339.4, 337.2, 345, 343.5, 339.2, 340.5, 337.5, 340.4, 342.8, 338.9, 341]
disp = [0.4031, 0.4140, 0.4002, 0.3952, 0.3912, 0.4088, 0.3963, 0.3956, 0.3933, 0.3902, 0.3886, 0.4059, 0.4052, 0.3996, 0.3936, 0.3962, 0.4052, 0.4073, 0.3993, 0.4006]

# Convert lists to TensorFlow tensors
temp_tensor = tf.constant(temp)
disp_tensor = tf.constant(disp)

# Concatenate tensors along a new dimension (1D array for each)
conditions = tf.concat([temp_tensor[:, tf.newaxis], disp_tensor[:, tf.newaxis]], axis=1)

# Print the concatenated tensor and its shape
print(conditions)
print(conditions.shape)


tf.Tensor(
[[342.4      0.4031]
 [343.8      0.414 ]
 [342.9      0.4002]
 [341.       0.3952]
 [340.6      0.3912]
 [345.4      0.4088]
 [340.5      0.3963]
 [340.3      0.3956]
 [337.8      0.3933]
 [339.4      0.3902]
 [337.2      0.3886]
 [345.       0.4059]
 [343.5      0.4052]
 [339.2      0.3996]
 [340.5      0.3936]
 [337.5      0.3962]
 [340.4      0.4052]
 [342.8      0.4073]
 [338.9      0.3993]
 [341.       0.4006]], shape=(20, 2), dtype=float32)
(20, 2)


In [8]:
# Constants
C5, C6, C7, C8 = 1.0, 1.0, 1.0, 1.0
sigma = 0.2
kappa = 0.1

# Helper function to compute vicinal points and weights
def compute_vicinal_points_and_weights(y, eps):
    y_eps = y + eps
    weights = tf.exp(-tf.square(eps) / (2 * tf.square(sigma)))
    return y_eps, weights

# Indicator function
def indicator_function(y, y_tilde, kappa):
    return tf.cast(tf.abs(y - y_tilde) <= kappa, tf.float32)


In [9]:
from tensorflow.keras.layers import Input, Dense, Reshape, BatchNormalization, LeakyReLU, Conv2DTranspose, Concatenate, Activation
from tensorflow.keras.models import Model

def build_generator(latent_dim, condition_dim):
    noise = Input(shape=(latent_dim,))
    condition = Input(shape=(condition_dim,))

    # Combine noise and condition inputs
    x = Concatenate()([noise, condition])

    # Project and reshape
    x = Dense(128 * 16 * 16)(x)  # Start with a projection size that can be reshaped into 16x16x128
    x = Reshape((16, 16, 128))(x)  # Reshape according to the projected size
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)

    # Upsampling layers
    x = Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same')(x)  # Upsample to 32x32
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)

    x = Conv2DTranspose(64, (4, 4), strides=(2, 2), padding='same')(x)  # Upsample to 64x64
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)

    x = Conv2DTranspose(32, (4, 4), strides=(2, 2), padding='same')(x)  # Upsample to 128x128
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)

    x = Conv2DTranspose(1, (4, 4), strides=(2, 2), padding='same')(x)  # Upsample to 256x256
    img = Activation('tanh')(x)  # Use 'tanh' activation for final image output

    return Model([noise, condition], img)

# Example usage:
latent_dim = 100
condition_dim = 2
generator = build_generator(latent_dim, condition_dim)
generator.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 100)]        0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 2)]          0           []                               
                                                                                                  
 concatenate (Concatenate)      (None, 102)          0           ['input_1[0][0]',                
                                                                  'input_2[0][0]']                
                                                                                                  
 dense (Dense)                  (None, 32768)        3375104     ['concatenate[0][0]']        

In [10]:
def build_discriminator(image_shape, condition_dim):
    img = Input(shape=image_shape)
    condition = Input(shape=(condition_dim,))

    x = Conv2D(64, (4, 4), strides=(2, 2), padding='same')(img)
    x = LeakyReLU(alpha=0.2)(x)
    x = Dropout(0.25)(x)

    x = Conv2D(128, (4, 4), strides=(2, 2), padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Dropout(0.25)(x)

    x = Flatten()(x)
    x = Concatenate()([x, condition])
    x = Dense(1, activation='sigmoid')(x)

    return Model([img, condition], x)


In [11]:
disc = build_discriminator((256,256,1),2)
disc.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 256, 256, 1  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 128, 128, 64  1088        ['input_3[0][0]']                
                                )                                                                 
                                                                                                  
 leaky_re_lu_4 (LeakyReLU)      (None, 128, 128, 64  0           ['conv2d[0][0]']                 
                                )                                                           

In [12]:
def hvdl_loss(y_real, y_fake, D_real, D_fake, kappa, C5,C6):
    C5 = tf.cast(C5, tf.float32)
    C6 = tf.cast(C6, tf.float32)
    N_r = tf.shape(y_real)[0]
    N_g = tf.shape(y_fake)[0]
    N_r = tf.cast(N_r, tf.float32)
    N_g = tf.cast(N_g, tf.float32)

    # Ensure epsilon is a tensor with the correct type
    y_real_eps = tf.cast(tf.constant(1e-8), tf.float32)
    y_fake_eps = tf.cast(tf.constant(1e-8), tf.float32)

    def indicator_function(y, y_eps, kappa):
        return tf.cast(tf.reduce_mean(tf.abs(y - y_eps), axis=1) < kappa, tf.float32)

    term1 = tf.reduce_mean(indicator_function(y_real, y_real_eps, kappa) * tf.math.log(D_real + y_real_eps))
    term2 = tf.reduce_mean(indicator_function(y_fake, y_fake_eps, kappa) * tf.math.log(1 - D_fake + y_fake_eps))

    loss = -C5 / N_r * term1 - C6 / N_g * term2
    return loss



def svdl_loss(y_real, y_fake, D_real, D_fake, sigma, C7, C8):
    N_r = tf.shape(y_real)[0]
    N_g = tf.shape(y_fake)[0]
    N_r = tf.cast(N_r, tf.float32)
    N_g = tf.cast(N_g, tf.float32)
    
    C7 = tf.cast(C7, tf.float32)
    C8 = tf.cast(C8, tf.float32)
    
    eps_r = tf.random.normal(tf.shape(y_real), mean=0.0, stddev=sigma)
    eps_g = tf.random.normal(tf.shape(y_fake), mean=0.0, stddev=sigma)

    def compute_vicinal_points_and_weights(y, eps):
        y_eps = y + eps
        w = 1.0 / (tf.sqrt(2.0 * tf.constant(np.pi)) * sigma) * tf.exp(-tf.square(eps) / (2.0 * tf.square(sigma)))
        return y_eps, w

    y_real_eps, w_real = compute_vicinal_points_and_weights(y_real, eps_r)
    y_fake_eps, w_fake = compute_vicinal_points_and_weights(y_fake, eps_g)

    term1 = tf.reduce_mean(w_real * tf.math.log(D_real + 1e-8))
    term2 = tf.reduce_mean(w_fake * tf.math.log(1 - D_fake + 1e-8))

    loss = -C7 / N_r * term1 - C8 / N_g * term2
    return loss


In [13]:
def generator_loss(D, G, z, y, sigma):
    # Generate noise from a normal distribution
    epsilon = tf.random.normal(shape=tf.shape(y), mean=0.0, stddev=sigma)
    
    # Generate images from the generator
    generated_images = G([z, y], training=True)
    
    # Get the discriminator's output for the generated images
    D_output = D([generated_images, y + epsilon], training=True)
    
    # Calculate the loss
    loss = -tf.reduce_mean(tf.math.log(D_output + 1e-10))  # Adding a small constant to avoid log(0)
    
    return loss

In [14]:
def compare_images(noise, condition,target,epoch):
  generated = generator.predict([noise, condition])
  plt.figure(figsize=(15,5))

  images_list = [target[0], generated[0]]
  title = ['Real (ground truth)', 'Generated Image (fake)']

  for i in range(2):
    plt.subplot(1, 2, i + 1)
    plt.title(title[i])
    plt.imshow(images_list[i] * 0.5 + 0.5,cmap='gray')
    plt.axis('off')
  plt.suptitle(f'EPOCH {epoch}')
  plt.show()

In [15]:
def train(generator, discriminator, latent_dim, condition_dim, epochs, batch_size):
    # Optimizers
    d_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)
    g_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

    NOISE = np.random.normal(0, 1, (1, latent_dim))
    
    CONDITION = X_train[0]
    CONDITION = tf.expand_dims(CONDITION,axis=0)
    
    REAL = y_train[0]
    REAL = tf.expand_dims(REAL,axis=0)
    print(CONDITION.shape)
    print(REAL.shape)
    
    for epoch in range(epochs):
        # Generate noise and conditions
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        #conditions = np.random.uniform(-1, 1, (batch_size, condition_dim))

        # Generate fake images
        

        # Select a random batch of real images
        #idx = np.random.randint(0, y_train.shape[0], batch_size)
        
        ids = tf.random.shuffle(tf.range(X_train.shape[0]))[:batch_size]
        real_conditions = tf.gather(X_train, ids)
        gen_imgs = generator.predict([noise, real_conditions])
        
        real_imgs = tf.gather(y_train,ids)
        

        # Discriminator loss
        with tf.GradientTape() as tape:
            D_real = discriminator([real_imgs, real_conditions])
            D_fake = discriminator([gen_imgs, real_conditions])
            
            d_loss = hvdl_loss(real_conditions, conditions, D_real, D_fake, kappa,C5,C6) + svdl_loss(real_conditions, conditions, D_real, D_fake,sigma, C7, C8 )
            
        grads = tape.gradient(d_loss, discriminator.trainable_weights)
        d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

        # Generator loss
        with tf.GradientTape() as tape:
            gen_imgs = generator([noise, conditions])
            D_fake = discriminator([gen_imgs, conditions])
            g_loss = generator_loss(discriminator,generator, noise, rea, sigma)
            
        grads = tape.gradient(g_loss, generator.trainable_weights)
        g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))

        if(epoch%5==0):
            compare_images(NOISE,CONDITION, REAL, 0)

        print(f'{epoch}/{epochs}, d_loss: {d_loss.numpy()}, g_loss: {g_loss.numpy()}')



In [16]:
x.shape

(20, 256, 256, 2)

In [17]:
# Parameters
latent_dim = 100
condition_dim = 2  # Two conditions: temp and disp
image_shape = (256, 256, 1)  # Assuming grayscale images

# Build and compile models
generator = build_generator(latent_dim, condition_dim)
discriminator = build_discriminator(image_shape, condition_dim)





In [None]:
X_train = conditions
y_train = y
# Training
train(generator, discriminator, latent_dim, condition_dim, epochs=500, batch_size=1)

In [38]:
X_train[6]

<tf.Tensor: shape=(2,), dtype=float32, numpy=array([340.5   ,   0.3963], dtype=float32)>

In [43]:
X_train.shape[0]

20

In [49]:
idx = np.random.randint(0, y_train.shape[0], 4)
real_imgs = y_train[idx]
print(idx)
real_conditions = X_train[idx]


[9 3 0 4]


TypeError: Only integers, slices (`:`), ellipsis (`...`), tf.newaxis (`None`) and scalar tf.int32/tf.int64 tensors are valid indices, got array([9, 3, 0, 4])

In [51]:
selected_indices = tf.random.shuffle(tf.range(X_train.shape[0]))[:4]
selected_samples = tf.gather(X_train, selected_indices)

# Print selected samples
print(selected_samples)

tf.Tensor(
[[340.6      0.3912]
 [338.9      0.3993]
 [343.8      0.414 ]
 [342.9      0.4002]], shape=(4, 2), dtype=float32)
