In [None]:
import numpy as np
import cv2
import matplotlib.pyplot as plt

image_paths = [
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2013-10-03.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2014-08-19.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2015-05-18.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2017-07-10.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2017-09-28.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2018-10-01.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2019-09-18.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2020-09-20.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2021-05-02.tiff',
'/home/Dataset/UTRNet_data/Scene9/RGB_image/2021-06-19.tiff'
]

#loading the images and stacking them sequentially
def load_images(image_paths):
    images = []
    for path in image_paths:
        image = cv2.imread(path)
        images.append(image)

    stacked_images = np.stack(images, axis=-1)
    return stacked_images

# This function applies Fast Fourier Transformation and low-frequency masking
def apply_frequency_mask(image, radius=50):
    original_dft = np.fft.fftshift(np.fft.fft2(image, axes=(0, 1)))
    magnitude_spectrum = np.abs(original_dft)

    rows, cols = image.shape[:2]
    crow, ccol = rows // 2, cols // 2

    low_freq_mask = np.zeros((rows, cols), np.uint8)
    cv2.circle(low_freq_mask, (ccol, crow), radius, 1, thickness=-1)

    low_freq_magnitude = magnitude_spectrum * low_freq_mask

    # Reconstruct the image using inverse FFT
    reconstructed_image = reconstruct_image(original_dft, low_freq_magnitude)

    return np.clip(reconstructed_image, 0, 255).astype(np.uint8)

# This function reconstruct the image using inverse FFT
def reconstruct_image(original_dft, masked_magnitude):
    phase = np.angle(original_dft)
    combined_spectrum = masked_magnitude * np.exp(1j * phase)
    inverse_shift = np.fft.ifftshift(combined_spectrum)
    reconstructed_image = np.real(np.fft.ifft2(inverse_shift))
    return reconstructed_image

# This function combines FFT and Masking for all image sequence
def apply_mask_sequence(image_sequence, radius=50):
    low_freq_sequence = []

    for i in range(image_sequence.shape[-1]):
        image = image_sequence[..., i]
        if image.ndim == 3:
            low_freq_img = []
            for ch in range(image.shape[2]):
                lf_img = apply_frequency_mask(image[:, :, ch], radius)
                low_freq_img.append(lf_img)

            low_freq_img = np.stack(low_freq_img, axis=-1)
        else:
            low_freq_img = apply_frequency_mask(image, radius)

        low_freq_sequence.append(low_freq_img)

    low_freq_sequence = np.stack(low_freq_sequence, axis=-1)

    return low_freq_sequence

stacked_images = load_images(image_paths)

low_freq_sequence = apply_mask_sequence(stacked_images)

# Normalize images to [0, 1]
low_freq_sequence = low_freq_sequence / 255.
low_freq_sequence.shape

In [None]:
masked_reconstructed_sequence_reshaped = low_freq_sequence.reshape((1, 400, 400, 30))

masked_reconstructed_sequence_reshaped.shape

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, losses, optimizers
from tensorflow.keras.optimizers.schedules import ExponentialDecay
import cv2
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score

def create_cnn_decoder():
    model = models.Sequential([
        layers.Input(shape=(None, None, 30)),
        layers.Conv2D(64, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.ReLU(),
        layers.Dropout(0.3),
        layers.Conv2D(128, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.ReLU(),
        layers.Dropout(0.3),
        layers.Conv2D(64, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.ReLU(),
        layers.Conv2D(1, (3, 3), padding='same', activation='sigmoid')
    ])
    return model

# Define discriminator function for Adversarial Training
def create_discriminator():
    model = models.Sequential([
        layers.Input(shape=(None, None, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.Dropout(0.3),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.Dropout(0.5),
        layers.GlobalAveragePooling2D(),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

# Instantiate models
decoder = create_cnn_decoder()
discriminator = create_discriminator()

# Define an exponential decay scheduler for decoder and discriminator
#decoder_optimizer = optimizers.Adam(learning_rate=0.0001)
#discriminator_optimizer = optimizers.Adam(learning_rate=0.0001)
initial_lr = 0.001
decay_steps = 1000
decay_rate = 0.96

lr_schedule = ExponentialDecay(
    initial_learning_rate=initial_lr,
    decay_steps=decay_steps,
    decay_rate=decay_rate,
    staircase=True
)

decoder_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# Training setup
epochs = 500

In [None]:
#Loading and processing the ground truth image
def load_ground_truth(filepath):
    ground_truth = cv2.imread(filepath, cv2.IMREAD_COLOR)
    print(f"Ground truth shape: {ground_truth.shape}")

    ground_truth_rgb = cv2.cvtColor(ground_truth, cv2.COLOR_BGR2RGB)

    change_lower = np.array([150, 0, 0])
    change_upper = np.array([255, 100, 100])

    unchange_lower = np.array([150, 150, 0])
    unchange_upper = np.array([255, 255, 100])

    change_mask = cv2.inRange(ground_truth_rgb, blue_lower, blue_upper)
    unchange_mask = cv2.inRange(ground_truth_rgb, green_lower, green_upper)

    ground_truth_binary = np.zeros_like(change_mask, dtype=np.uint8)
    ground_truth_binary[change_mask > 0] = 1  # Set changed areas to 1
    ground_truth_binary[unchange_mask > 0] = 0  # Set unchanged areas to 0

    return ground_truth_binary

ground_truth_binary = load_ground_truth('/home/Dataset/UTRNet_data/Scene9/Ground_Truth.tiff')

ground_truth_binary_resized = cv2.resize(ground_truth_binary,
                                         (masked_reconstructed_sequence_reshaped.shape[2],
                                          masked_reconstructed_sequence_reshaped.shape[1]))

# expand the ground truth dim
ground_truth_binary_resized = np.expand_dims(ground_truth_binary_resized, axis=(0, -1))  # Shape: (1, height, width, 1)

# binary change map
binary_change_map = np.expand_dims(ground_truth_binary, axis=(0, -1))  # Shape: (1, height, width, 1)


In [None]:
#Loss Functions with Adversarial Training
# Calculate class weights based on the pixel distribution for Binary Cross-Entropy
total_pixels = np.prod(ground_truth_binary.shape)
change_count = np.sum(ground_truth_binary == 1)
unchanged_count = total_pixels - change_count

weight_for_changed = (1 / change_count) * (total_pixels / 2.0)
weight_for_unchanged = (1 / unchanged_count) * (total_pixels / 2.0)

#class_weights = {0: weight_for_unchanged, 1: weight_for_changed}

# Focal Loss
def focal_loss(y_true, y_pred, alpha=0.25, gamma=2.0):
    bce = losses.BinaryCrossentropy()(y_true, y_pred)
    bce_exp = tf.exp(-bce)
    focal_loss = alpha * (1 - bce_exp) ** gamma * bce
    return focal_loss

# Weighted Binary Cross-Entropy
def weighted_binary_crossentropy(y_true, y_pred, weight_for_unchanged, weight_for_changed):
    weights = tf.where(tf.equal(y_true, 1), weight_for_changed, weight_for_unchanged)
    return losses.BinaryCrossentropy()(y_true, y_pred, sample_weight=weights)

# Contrastive Loss Function
def contrastive_loss(y_true, y_pred, margin=1.0):
    y_pred = tf.expand_dims(y_pred, axis=-1) if len(y_pred.shape) == 3 else y_pred
    squared_pred = tf.reduce_sum(tf.square(y_pred - y_true), axis=-1)
    squared_pred = tf.expand_dims(squared_pred, axis=-1)
    loss = tf.reduce_mean(y_true * squared_pred + (1 - y_true) * tf.maximum(margin - squared_pred, 0))
    return loss

# Combined Losses
def combined_loss(y_true, y_pred, real_output, fake_output, weight_for_unchanged, weight_for_changed):
    # Contrastive loss
    con_loss = contrastive_loss(y_true, y_pred)

    # Weighted Binary Cross-Entropy Loss
    weighted_bce_loss = weighted_binary_crossentropy(y_true, y_pred, weight_for_unchanged, weight_for_changed)

    # Focal Loss
    focal_loss_value = focal_loss(y_true, y_pred)

    # Adversarial Loss (Binary Cross-Entropy for real/fake discriminator predictions)
    adv_loss = losses.BinaryCrossentropy()(tf.ones_like(fake_output), fake_output)

    # Adjust loss weights
    contrastive_loss_weight = 1.0
    weighted_bce_loss_weight = 0.5
    focal_loss_weight = 0.5
    adversarial_loss_weight = 0.5

    # Generator/Decoder Loss: Combine all
    gen_loss = (contrastive_loss_weight * con_loss) + \
               (weighted_bce_loss_weight * weighted_bce_loss) + \
               (focal_loss_weight * focal_loss_value) + \
               (adversarial_loss_weight * adv_loss)

    return gen_loss


In [None]:
# Training
for epoch in range(epochs):
    # Training Discriminator
    with tf.GradientTape() as disc_tape:
        fake_change_map = decoder(masked_reconstructed_sequence_reshaped, training=True)
        real_output = discriminator(binary_change_map, training=True)
        fake_output = discriminator(fake_change_map, training=True)

        # Discriminator loss
        disc_loss = losses.BinaryCrossentropy()(tf.ones_like(real_output), real_output) + \
                    losses.BinaryCrossentropy()(tf.zeros_like(fake_output), fake_output)

    grads = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))

    # Training Decoder
    with tf.GradientTape() as gen_tape:
        fake_change_map = decoder(masked_reconstructed_sequence_reshaped, training=True)
        fake_output = discriminator(fake_change_map, training=False)

        gen_loss = combined_loss(binary_change_map, fake_change_map, real_output, fake_output,
                                 weight_for_unchanged, weight_for_changed)

    grads = gen_tape.gradient(gen_loss, decoder.trainable_variables)
    decoder_optimizer.apply_gradients(zip(grads, decoder.trainable_variables))

    # Log training progress
    print(f"Epoch [{epoch+1}/{epochs}], Disc Loss: {disc_loss.numpy():.4f}, Gen Loss: {gen_loss.numpy():.4f}")

# Compare predicted map to ground truth
predicted_change_map = decoder.predict(masked_reconstructed_sequence_reshaped)[0].squeeze()

predicted_change_map_resized = cv2.resize(predicted_change_map, (ground_truth_binary.shape[1], ground_truth_binary.shape[0]))
predicted_change_map_binary = (predicted_change_map_resized > 0.5).astype(np.uint8)


# Visualize results
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(predicted_change_map_binary, cmap='gray')
plt.title('Predicted Binary Change Map')
plt.axis('off')

plt.subplot(1, 2, 2)
plt.imshow(ground_truth_binary, cmap='gray')
plt.title('Ground Truth Binary Change Map')
plt.axis('off')

plt.show()


In [None]:
# Function to calculate IOU)
def calculate_iou(y_true, y_pred):
    intersection = np.logical_and(y_true, y_pred).sum()
    union = np.logical_or(y_true, y_pred).sum()
    iou = intersection / union if union > 0 else 0
    return iou

# Calculate evaluation metrics using the binary predicted change map
accuracy = accuracy_score(ground_truth_binary.flatten(), predicted_change_map_binary.flatten())
precision = precision_score(ground_truth_binary.flatten(), predicted_change_map_binary.flatten())
recall = recall_score(ground_truth_binary.flatten(), predicted_change_map_binary.flatten())
f1 = f1_score(ground_truth_binary.flatten(), predicted_change_map_binary.flatten())
kappa = cohen_kappa_score(ground_truth_binary.flatten(), predicted_change_map_binary.flatten())
iou = calculate_iou(ground_truth_binary, predicted_change_map_binary)

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Kappa: {kappa:.4f}")
print(f"IoU: {iou:.4f}")
