In [None]:
# !pip install opencv-python

In [None]:
import keras
import os
import imageio
import matplotlib.pyplot as plt
import numpy as np
import random
import cv2
import tensorflow as tf

In [None]:
input_res = 32
output_res = input_res * 2


# Models

In [None]:
generator_inputs = keras.Input(shape=(input_res, input_res, 1))

x = keras.layers.Conv2D(32, 5, padding='same', activation='relu')(generator_inputs)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
#x = keras.layers.Conv2D(4, 3, padding='same', activation='relu')(x)

#Subpixel_layer = keras.layers.Lambda(lambda x:tf.nn.depth_to_space(x,2))
#x = Subpixel_layer(inputs=x)

x = keras.layers.UpSampling2D()(x)
x = keras.layers.Dropout(0.2)(x)

x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)
#x = keras.layers.Conv2D(32, 3, padding='same', activation='relu')(x)

generator_outputs = keras.layers.Conv2D(1, 3, activation="sigmoid", padding="same")(x)
generator = keras.Model(generator_inputs, generator_outputs, name='generator')

generator.summary()

In [None]:
discriminator_inp_1 = keras.Input(shape=(input_res, input_res, 1))
discriminator_inp_2 = keras.Input(shape=(output_res, output_res, 1))

#discriminator_inputs = keras.layers.Concatenate(axis=-1)([
#    discriminator_inp_1,
#    discriminator_inp_2
#])

x = keras.layers.Conv2D(16, 3, padding='same', activation='relu', strides=(1, 1))(discriminator_inp_2)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Conv2D(16, 3, padding='same', activation='relu', strides=(2, 2))(x)
x = keras.layers.Concatenate(axis=-1)([
    discriminator_inp_1,
    x
])
x = keras.layers.Conv2D(16, 3, padding='valid', activation='relu', strides=(2, 2))(x)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Conv2D(32, 3, padding='valid', activation='relu', strides=(2, 2))(x)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Conv2D(32, 3, padding='valid', activation='relu', strides=(2, 2))(x)
#x = keras.layers.Conv2D(16, 3, padding='valid', activation='relu', strides=(2, 2))(x)

x = keras.layers.Flatten()(x)
x = keras.layers.Dense(8)(x)
x = keras.layers.Dropout(0.2)(x)
discriminator_outputs = keras.layers.Dense(1, activation='sigmoid')(x)

discriminator = keras.Model(
    [
        discriminator_inp_1,
        discriminator_inp_2
    ],
    discriminator_outputs,
    name='discriminator'
)

discriminator.summary()

In [None]:
gen_input = keras.Input(shape=(input_res, input_res, 1))
real_img = keras.Input(shape=(input_res, input_res, 1))

gen_output = generator(gen_input)
disc_output = discriminator([real_img, gen_output])

full_model = keras.Model(
    inputs=[
        gen_input,
        real_img,
    ],
    #outputs=disc_output
    outputs=[
        disc_output,
        gen_output
    ],
    #generator_inputs,
    #discriminator([generator_outputs, generator_outputs])
    name='full_model'
)

full_model.summary()

# Data

In [None]:
images = []

for file_name in os.listdir('cppoutput'):
    if 'image' not in file_name:
        continue
    full_name = 'cppoutput/' + file_name
    img = imageio.imread(full_name) / 255.0
    #print(full_name, img.shape)
    
    images.append(img)

random.shuffle(images)

len(images)

In [None]:
plt.imshow(images[123])

In [None]:
all_y = []
all_x = []

for img in images:
    for i in range(int(img.shape[0] / output_res)):
        for j in range(int(img.shape[0] / output_res)):
            sub_img = img[
                i*output_res:(i+1)*output_res,
                j*output_res:(j+1)*output_res
            ]
            sm_sub_img = cv2.resize(
                sub_img,
                dsize=(input_res, input_res),
                interpolation=cv2.INTER_NEAREST
            )
            # Ignore empty or almost empty images
            if np.unique(sub_img.flatten()).shape[0] < 20:
                continue
            all_y.append(sub_img)
            all_x.append(sm_sub_img)

all_y = np.expand_dims(np.array(all_y), [3])
all_x = np.expand_dims(np.array(all_x), [3])

# ((102400, 64, 64, 1), (102400, 32, 32, 1))
# ((64923, 64, 64, 1), (64923, 32, 32, 1))
all_y.shape, all_x.shape

In [None]:
all_y[random.randint(0, len(all_y))].shape

In [None]:
def plot_grid(imgs, shuffle=True):
    fig = plt.figure(figsize=(10, 10))
    columns = 6
    rows = 6
    for i in range(1, columns*rows +1):
        fig.add_subplot(rows, columns, i)
        if shuffle:
            plt.imshow(imgs[random.randint(0, len(imgs) - 1)])
        else:
            plt.imshow(imgs[i])
    plt.show()

plot_grid(all_x, shuffle=False)

In [None]:
plot_grid(all_y, shuffle=False)

In [None]:
train_y = all_y[:int(len(all_y)*0.9)]
train_x = all_x[:int(len(all_x)*0.9)]
test_y = all_y[int(len(all_y)*0.9):]
test_x = all_x[int(len(all_x)*0.9):]

train_y.shape, train_x.shape, test_y.shape, test_x.shape

# Training

In [None]:
generator.compile(
    optimizer="adam",
    loss="mse",
    #metrics=[keras.metrics.Accuracy()]
)


In [None]:
#history = generator.fit(
#    train_x[0:4216],
#    train_y[0:4216],
#    batch_size=32,
#    #validation_data=(test_x, test_y),
#    epochs=5,
#)

In [None]:
pred_input = test_x[0:500]
#corr_output = test_y[100:110]
predicted = generator.predict(pred_input)

In [None]:
plot_grid(predicted, shuffle=False)

In [None]:
plot_grid(predicted, shuffle=False)

In [None]:
#predicted_y = model.predict(train_x[0:4216])

In [None]:
#expected_y = train_y[0:4216]

In [None]:
#predicted_y.shape, expected_y.shape

In [None]:
#discriminator_x = np.concatenate([predicted_y, expected_y], 0)
#discriminator_x.shape

In [None]:
#discriminator_y = np.concatenate([np.zeros(4216), np.ones(4216)], 0).reshape(4216 * 2, 1)
#discriminator_y.shape

In [None]:
#full_model.compile()


In [None]:
#history = full_model.fit(
#    full_x,
#    full_y,
#    batch_size=32,
#    #validation_data=(test_x, test_y),
#    epochs=10,
#    shuffle=True,
#)

# The whole repetitive training

In [None]:
pred_input = [i for i in test_x[0:2000]]
random.shuffle(pred_input)
pred_input = np.array(pred_input)

In [None]:
plot_grid(pred_input)

In [None]:
# generator.predict(np.array(pred_input)).shape

In [None]:
predicted_y = generator.predict(train_x[0:2000])


In [None]:

discriminator_x_1 = np.concatenate([
    train_x[0:2000],
    train_x[0:2000],
], axis=0)
discriminator_x_2 = np.concatenate([
    generator.predict(train_x[0:2000]),
    train_y[0:2000],
], axis=0)

discriminator_x_1.shape, discriminator_x_2.shape


In [None]:
all_disc_input_x = np.array([]).reshape(0, output_res, output_res, 1)

# Prepopulate this thing, so we can train the discriminator with somethign...
#predicted_y = generator.predict(train_x[0:1000])
#all_disc_input_x = np.concatenate([all_disc_input_x, predicted_y], 0)


def save_prev_images(n):
    predicted = generator.predict(pred_input)
    for i in range(5):
        img = predicted[i]
        imageio.imwrite(
            f'predictions/real_output_{str(i).rjust(3, "0")}_{str(n).rjust(5, "0")}.png',
            (
                img * 255
            ).astype('uint8')
        )

total_n = 100
train_size = 10000
val_size = 200


discriminator.compile(
    optimizer="adam",
    #loss="binary_crossentropy",
    loss='mse',
    #metrics=[keras.metrics.BinaryAccuracy(), keras.metrics.Accuracy()]
    metrics=['mean_absolute_percentage_error']
)

full_model.compile(
    optimizer=keras.optimizers.adam_v2.Adam(learning_rate=0.00002),
    #optimizer='adam',
    #loss="binary_crossentropy",
    loss=['mse', 'mae'],
    #loss_weights=[1, 100],
    #loss_weights=[1, 10],
    loss_weights=[10, 1],
    #loss_weights=[100, 1],
    #metrics=[keras.metrics.BinaryAccuracy(), keras.metrics.Accuracy()]
    metrics=['mean_absolute_percentage_error']
)

for n in range(0, total_n):
    print('Startping epoch', n)
    # Save a preview image
    save_prev_images(n)

    # Set discriminator trainable
    #for l in full_model.layers[-1].layers:
    #    l.trainable = True
    discriminator.trainable = True

    # Train discriminator
    #predicted_y = generator.predict(train_x[0:2000])
    #if len(all_disc_input_x) >= 10000:
    #    tmp = [i for i in all_disc_input_x]
    #    random.shuffle(tmp)
    #    tmp = tmp[0:9000]
    #    all_disc_input_x = np.array(tmp)
    #all_disc_input_x = np.concatenate([all_disc_input_x, predicted_y], 0)
    #all_disc_input_x = predicted_y
    
    #disc_train_input_false = np.concatenate([predicted_y, train_y[0:2000]], axis=3)
    #disc_train_input_true = np.concatenate([predicted_y, train_y[0:2000]], axis=3)
    
    #discriminator_x = np.concatenate([all_disc_input_x, expected_y], 0)
    
    
    #print('moo')
    #thing = [a for a in all_disc_input_x]
    #random.shuffle(thing)
    #all_disc_input_x = np.array(thing[0:10000])
    
    print('Training discriminator')
    #false_len = all_disc_input_x.shape[0]
    #expected_y = train_y[0:false_len]
    #discriminator_x = np.concatenate([all_disc_input_x, expected_y], 0)
    #discriminator_y = np.concatenate([np.zeros(false_len), np.ones(false_len)], 0).reshape(false_len * 2, 1)
    
    discriminator_x_1 = np.concatenate([
        train_x[0:train_size],
        train_x[0:train_size],
    ], axis=0)
    discriminator_x_2 = np.concatenate([
        generator.predict(train_x[0:train_size]),
        train_y[0:train_size],
    ], axis=0)
    discriminator_y = np.concatenate(
        [np.zeros(train_size), np.ones(train_size)],
        0
    ).reshape(train_size * 2, 1)
    
    test_discriminator_x_1 = np.concatenate([
        test_x[0:val_size],
        test_x[0:val_size],
    ], axis=0)
    test_discriminator_x_2 = np.concatenate([
        generator.predict(test_x[0:val_size]),
        test_y[0:val_size],
    ], axis=0)
    test_discriminator_y = np.concatenate(
        [np.zeros(val_size), np.ones(val_size)],
        0
    ).reshape(val_size * 2, 1)
    
    history = discriminator.fit(
        [discriminator_x_1, discriminator_x_2],
        discriminator_y,
        validation_data=[
            [test_discriminator_x_1, test_discriminator_x_2],
            test_discriminator_y,
        ],
        batch_size=32,
        #validation_data=(test_x, test_y),
        epochs=2, #3 if n > 0 else 20,
        shuffle=True,
    )

    # Set discriminator NOT trainable
    #for l in full_model.layers[-1].layers:
    #    l.trainable = False
    discriminator.trainable = False
    
    # Train the full model with discriminator attached
    print('Training the full model')
    full_x = train_x[0:train_size]
    full_y = np.ones((train_size, 1))
    
    test_full_x = test_x[0:val_size]
    test_full_y = np.ones((val_size, 1))
    
    history = full_model.fit(
        [full_x, full_x],
        [full_y, train_y[0:train_size]],
        validation_data=[
            [test_full_x, test_full_x],
            [test_full_y, test_y[0:val_size]],
        ],
        batch_size=16,
        #validation_data=(test_x, test_y),
        epochs=2,
        shuffle=True,
    )
    
    print('Full training epoch done')


save_prev_images(total_n)


In [None]:
# plot_grid((pred_input), shuffle=False)

In [None]:
plot_grid((test_x), shuffle=False)

In [None]:
plot_grid(generator.predict(test_x), shuffle=False)

In [None]:
plot_grid(test_y, shuffle=False)