In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import numpy as np
from PIL import Image
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU, UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
from tensorflow.keras.initializers import RandomNormal
from collections import Counter
from torch.utils.data import SubsetRandomSampler
import numpy as np


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def load_images(image_path, image_size=(256, 256)):
    pre_images = []
    post_images = []
    pre_paths = []
    post_paths = []
    skiping = []
    for filename in os.listdir(image_path):
        if 'pre_disaster' in filename:
            pre_full_path = os.path.join(image_path, filename)
            post_full_path = pre_full_path.replace('pre_disaster', 'post_disaster')

            # Check if both pre and post disaster files exist before opening
            if os.path.exists(pre_full_path) and os.path.exists(post_full_path):
                try:
                    image = Image.open(pre_full_path)
                    post_image = Image.open(post_full_path)

                    # Append the images and their paths only if both files are successfully opened
                    pre_images.append(np.array(image.resize(image_size)))
                    pre_paths.append(pre_full_path)
                    post_images.append(np.array(post_image.resize(image_size)))
                    post_paths.append(post_full_path)

                except IOError as e:
                    print(f"Error opening one of the images {pre_full_path} or {post_full_path}: {e}")
            else:
                # skiping.append(filename)
                # print(f"Skipping {filename} as the corresponding post disaster image does not exist.")
                os.remove(pre_full_path)  # This line removes the pre-disaster image file
                print(f"Removed {pre_full_path} due to missing corresponding post disaster image.")

    return np.array(pre_images), np.array(post_images), pre_paths, post_paths

image_path = '/content/drive/My Drive/XBD_train_Data/train/images'
pre_images, post_images, pre_paths, post_paths = load_images(image_path)
# print(skiping)

In [None]:
n_gen = len(post_images)


In [None]:
pre_images.shape

(1627, 256, 256, 3)

In [None]:
def build_generator():
    model = Sequential()
    model.add(Dense(128 * 64 * 64, activation="relu", input_dim=100, kernel_initializer=RandomNormal(0, 0.02)))
    model.add(Reshape((64, 64, 128)))
    model.add(UpSampling2D())
    model.add(Conv2D(128, kernel_size=4, padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(UpSampling2D())
    model.add(Conv2D(64, kernel_size=4, padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(Conv2D(3, kernel_size=4, padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(Activation("tanh"))
    return model


In [None]:

def build_discriminator():
    model = Sequential()
    model.add(Conv2D(32, kernel_size=3, strides=2, input_shape=(256, 256, 3), padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(64, kernel_size=3, strides=2, padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(ZeroPadding2D(padding=((0,1),(0,1))))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(128, kernel_size=3, strides=2, padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(256, kernel_size=3, strides=1, padding="same", kernel_initializer=RandomNormal(0, 0.02)))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    return model


In [None]:
def build_gan(generator, discriminator):
    discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])
    discriminator.trainable = False
    z = Input(shape=(100,))
    img = generator(z)
    validity = discriminator(img)
    combined = Model(z, validity)
    combined.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))
    return combined

generator = build_generator()
discriminator = build_discriminator()
gan = build_gan(generator, discriminator)


In [None]:
def save_images(epoch, generator, image_save_dir='/content/drive/My Drive/GAN_Images'):
    noise = np.random.normal(0, 1, (1, 100))  # Generate one image
    gen_img = generator.predict(noise)
    gen_img = 0.5 * gen_img + 0.5  # Rescale image from [-1, 1] to [0, 1]

    if not os.path.exists(image_save_dir):
        os.makedirs(image_save_dir)

    img_path = f"{image_save_dir}/epoch_{epoch}_image.png"  # Single image per epoch
    Image.fromarray((gen_img[0] * 255).astype(np.uint8)).save(img_path)



In [None]:
def train(generator, discriminator, combined, epochs, batch_size=128, save_interval=50):
    # Load and preprocess the dataset (placeholder function load_images needs to be defined or replaced)
    pre_images, post_images, pre_paths, post_paths = load_images(image_path)
    pre_images = (pre_images.astype(np.float32) - 127.5) / 127.5
    post_images = (post_images.astype(np.float32) - 127.5) / 127.5

    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    for epoch in range(epochs):
        idx = np.random.randint(0, pre_images.shape[0], batch_size)
        imgs_pre = pre_images[idx]
        imgs_post = post_images[idx]

        noise = np.random.normal(0, 1, (batch_size, 100))
        gen_imgs = generator.predict(noise)

        d_loss_real = discriminator.train_on_batch(imgs_post, valid)
        d_loss_fake = discriminator.train_on_batch(gen_imgs, fake)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        g_loss = combined.train_on_batch(noise, valid)

        print(f"Epoch: {epoch} [D loss: {d_loss[0]}, acc.: {100*d_loss[1]}%] [G loss: {g_loss}]")

        if epoch % save_interval == 0 or epoch == epochs - 1:  # Save the last image of each epoch
            save_images(epoch, generator)



In [None]:
# Initialize and train the GAN
generator = build_generator()
discriminator = build_discriminator()
gan = build_gan(generator, discriminator)
train(generator, discriminator, gan, epochs=600, batch_size=32, save_interval=100)