In [None]:
import tensorflow as tf
import keras as k
from keras import layers as l
import glob
import os
import numpy as np
import matplotlib.pyplot as plt
import time
from IPython import display

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)

Set up modified UNET

In [None]:
def encoder(layer_in, n_filters, batchnorm=True):
    # Initialize weights
    init = k.initializers.RandomNormal(0., 0.02)    # Add the downsampling
    d = l.Conv2D(n_filters, (4,4,), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
    if(batchnorm):
        d = l.BatchNormalization()(d, training=True)
    d = l.LeakyReLU(alpha=0.2)(d)
    return d

def decoder(layer_in, skip_in, n_filters, dropout=True):
    # Initialize weights
    init = k.initializers.RandomNormal(0., 0.02)    # Add the upsampling
    u = l.Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
    u = l.BatchNormalization()(u, training=True)
    if(dropout):
        u = l.Dropout(0.5)(u, training=True)
    # Merge
    u = l.Concatenate()([u, skip_in])
    u = l.Activation('relu')(u)
    return u

def Generator(image_shape=(256,256,3)):
    # Initialize weights
    init = k.initializers.RandomNormal(0., 0.02)
    # Input
    in_image = l.Input(shape=image_shape)

    # Encode
    e1 = encoder(in_image, 64, False)
    e2 = encoder(e1, 128)
    e3 = encoder(e2, 256)
    e4 = encoder(e3, 512)
    e5 = encoder(e4, 512)

    # Bottleneck
    b = l.Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e5)
    b = l.Activation('relu')(b)

    # Decode
    d1 = decoder(b, e5, 512)
    d2 = decoder(d1, e4, 512)
    d3 = decoder(d2, e3, 256, dropout=False)
    d4 = decoder(d3, e2, 128, dropout=False)
    d5 = decoder(d4, e1, 64, dropout=False)

    # Output
    g = l.Conv2DTranspose(3, (4,4), strides=(2,2), padding = 'same', kernel_initializer=init)(d5)
    out_image = l.Activation('tanh')(g)

    # Define model
    model = k.Model(inputs=in_image, outputs = out_image)
    return model

loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

LAMBDA = 100

def generator_loss(disc_fake_output, generator_output, target_output):
    gan_loss = loss_object(tf.ones_like(disc_fake_output), disc_fake_output)

    l1_loss = tf.reduce_mean(tf.abs(target_output - generator_output))

    total_gen_loss = gan_loss + (LAMBDA * l1_loss)
    return total_gen_loss, gan_loss, l1_loss

Setup descriminator

In [None]:
def Discriminator(image_shape):
    init = k.initializers.RandomNormal(0., 0.02)
    input_image = l.Input(shape=image_shape)
    target_image = l.Input(shape=image_shape)
    d = l.Concatenate()([input_image, target_image])
    
    d = l.SpectralNormalization(l.Conv2D(64, (4, 4), strides=2, padding='same', kernel_initializer=init))(d)
    d = l.LeakyReLU(alpha=0.2)(d)
    
    d = l.SpectralNormalization(l.Conv2D(128, (4, 4), strides=2, padding='same', kernel_initializer=init))(d)
    d = l.LayerNormalization()(d, training=True)
    d = l.LeakyReLU(alpha=0.2)(d)
    
    d = l.SpectralNormalization(l.Conv2D(256, (4, 4), strides=2, padding='same', kernel_initializer=init))(d)
    d = l.LayerNormalization()(d, training=True)
    d = l.LeakyReLU(alpha=0.2)(d)
    
    zero_pad1 = l.ZeroPadding2D()(d)
    d = l.Conv2D(512, 4, strides=1, kernel_initializer = init,
                 use_bias= False)(zero_pad1)
    
    batchnorm1 = l.BatchNormalization()(d)
    d = l.LeakyReLU(alpha=0.2)(batchnorm1)

    zero_pad2 = l.ZeroPadding2D()(d)
    
    d = l.SpectralNormalization(l.Conv2D(1, 4, strides=1, kernel_initializer=init))(zero_pad2)
    
    model = k.Model([input_image, target_image], d)
    return model

disc_ce_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = disc_ce_loss(tf.ones_like(real_output), real_output)

    generated_loss = disc_ce_loss(tf.zeros_like(fake_output), fake_output)

    total_loss = real_loss + generated_loss
    return total_loss

Dataset helpers

In [None]:
def load_images(rgb_paths, ndvi_paths, image_shape=(256,256,3)):
    # Load and resize images
    rgb_images = [k.preprocessing.image.load_img(img, target_size=image_shape) for img in rgb_paths]
    ndvi_images = [k.preprocessing.image.load_img(img, target_size=image_shape) for img in ndvi_paths]

    # Convert to numpy arrays
    rgb_images = np.array([k.preprocessing.image.img_to_array(img) / 127.5 - 1 for img in rgb_images])
    ndvi_images = np.array([k.preprocessing.image.img_to_array(img) / 127.5 - 1 for img in ndvi_images])
    
    return rgb_images, ndvi_images

def get_dataset(rgb_dir, ndvi_dir, image_shape=(256,256,3)):
    # Load images
    rgb_images = sorted(glob.glob(os.path.join(rgb_dir, '*.jpg')))
    ndvi_images = sorted(glob.glob(os.path.join(ndvi_dir, '*.jpg')))
    # Check if the number of images is the same
    if len(rgb_images) != len(ndvi_images):
        raise ValueError("Number of RGB and NDVI images do not match.")
    
    rgb_images = np.array(rgb_images)
    ndvi_images = np.array(ndvi_images)

    return rgb_images, ndvi_images
    

def real_pairs(dataset, n_samples, patch_shape):
    # Unpack
    trainA, trainB = dataset

    # Pick random
    ix = np.random.randint(0,trainA.shape[0], n_samples)
    X1, X2 = trainA[ix], trainB[ix]

    X1, X2 = load_images(X1, X2)
    # Label 1 (Real)
    y = tf.ones((n_samples, patch_shape, patch_shape, 1))
    return [X1, X2], y

def fake_pairs(g_model, samples, patch_shape):
    # Generate fake
    X = g_model.predict(samples, batch_size=32)
    # Label 0 (Fake)
    y = tf.zeros((len(X), patch_shape, patch_shape, 1))
    return X, y


Performance


In [None]:
def performance(step, g_model, d_model, dataset, n_samples=3):
    # Select an input sample
    [X_realA, X_realB], _ = real_pairs(dataset, n_samples, 1)

    # Generate a fake input sample
    X_fakeB, _ = fake_pairs(g_model, X_realA, 1)

    # Scale pixel values
    X_realA = (X_realA + 1) / 2.0
    X_fakeB = (X_fakeB + 1) / 2.0
    X_realB = (X_realB + 1) / 2.0

    for i in range(len(X_fakeB)):
        img = X_fakeB[i]
        print(img.shape)
        fig, axes = plt.subplots(nrows=1,ncols=1, figsize = (3,3))
        for col in range(1):
            axes.imshow(img)
            axes.axis('off')

        plt.tight_layout()
        filename0 = f'generated_{step + i + 1:06d}.png'
        plt.savefig(filename0, bbox_inches='tight', pad_inches=0)        
        plt.close()

    fig, axes = plt.subplots(nrows=3, ncols=n_samples, figsize=(n_samples * 3, 9))

    row_labels = ['Input', 'Generated', 'Ground Truth']

    for row, images in enumerate([X_realA, X_fakeB, X_realB]):
        for col in range(n_samples):
            axes[row, col].imshow(images[col])
            axes[row, col].axis('off')

        # Add row labels (left side)
        axes[row, 0].text(-50, images[0].shape[0] // 2, row_labels[row], fontsize=12,
                          va='center', ha='right', color='white', bbox=dict(facecolor='black', alpha=0.7))

    plt.tight_layout()

    # Save plot
    filename1 = f'plot_{step + 1:06d}.png'
    plt.savefig(filename1)
    plt.close()

    # Save models
    filename2 = f'generator_model_{step + 1:06d}.keras'
    g_model.save(filename2)

    filename3 = f'discriminator_model_{step + 1:06d}.keras'
    d_model.save(filename3)

    print(f'>Saved: {filename1}, {filename2}, {filename3}')


"""
Slightly refactored version of performance() meant to allow you to more easily generate and save outputs for validation
"""
def validation(g_model, dataset, n_samples=1):
    # Select an input sample
    trainA, trainB = dataset
    print(trainA)
    realA = []
    realB = []
    fakeB = []

    for i in range(len(trainA)):

        images = load_images([trainA[i]],[trainB[i]])
        realA.append((images[0]+1)/2)
        realB.append((images[1]+1)/2)
        fakeB.append((g_model.predict(images[0], batch_size=32)+1)/2)

    for j in range(len(realA)):
        X_fakeB = fakeB[j]
        X_realA = realA[j]
        X_realB = realB[j]
        for i in range(len(X_fakeB)):
            img = X_fakeB[i]
            print(img.shape)
            fig, axes = plt.subplots(nrows=1,ncols=1, figsize = (3,3))
            for col in range(1):
                axes.imshow(img)
                axes.axis('off')

            plt.tight_layout()
            filename0 = f'zgenerated_{j + 1:06d}.png'
            plt.savefig(filename0, bbox_inches='tight', pad_inches=0)        
            plt.close()
        print(f'>Saved')

In [None]:
def add_noise(images, noise_factor=0.05):
    noise = noise_factor * tf.random.normal(shape=images.shape)
    return images + noise

"""
Proceedure for each batch/image training step
"""
def train_step(input_image, target, gen_output, step, g_model, d_model):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = g_model(input_image, training = True)

        im = tf.convert_to_tensor(input_image)
        disc_real_output = d_model([input_image, target], training=True)
        disc_fake_output = d_model([im, gen_output], training=True)

        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_fake_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_fake_output)
        real_loss = disc_ce_loss(tf.ones_like(disc_real_output), disc_real_output)
        generated_loss = disc_ce_loss(tf.zeros_like(disc_fake_output), disc_fake_output)
    print(f">{step}, d[Total:{disc_loss:.2f} Real: {real_loss:.2f} Fake:{generated_loss:.2f} ] \tg[Total: {gen_total_loss:.2f} Gan: {gen_gan_loss:.2f} L1:{gen_l1_loss:.2f}]")
    generator_gradients = gen_tape.gradient(gen_total_loss,
                                          g_model.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss,
                                               d_model.trainable_variables)
    
    generator_optimizer.apply_gradients(zip(generator_gradients,
                                          g_model.trainable_variables))
    
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              d_model.trainable_variables))
    
"""
Main training loop
"""
def fit(gen, disc, dataset, steps):
    n_patch = disc.output_shape[1]
    start = time.time()

    # Unpack data
    trainA, trainB = dataset
    for step in range(steps):
        [X_realA, X_realB], _ = real_pairs([trainA, trainB], 1, n_patch)
        X_fakeB, _ = fake_pairs(gen, X_realA, n_patch)

        X_realA = add_noise(X_realA, 0.02)
        X_realB = add_noise(X_realB, 0.02)
        X_fakeB = add_noise(X_fakeB, 0.02)
        
        if (step) % 1000 == 0:
            display.clear_output(wait=True)                    
            if step != 0:
                print(f'Time taken for 1000 steps: {time.time()-start:.2f} sec\n')
                print(f"Step: {step//1000}k")
                start = time.time()
        if (step) % 1000 == 0:
            performance(step, gen, disc, [trainA, trainB])
        if (step) % 10_000 == 0:
            gen.save("generator_model.keras")
            disc.save("discriminator_model.keras") 
        train_step(X_realA, X_realB,X_fakeB, step, gen, disc)    

Load previously saved models and dataset

In [None]:
dataset = get_dataset('dataset/train/inputs/RGB', 'dataset/train/inputs/NDVI', image_shape=(256,256,3))
generator = k.models.load_model('generator_model.keras')
discriminator = k.models.load_model('discriminator_model.keras')
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)


Train the models

In [None]:
fit(generator, discriminator, dataset, 100_000)
