In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
import tensorflow as tf
import os
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import cv2
import random

In [9]:
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [10]:
!nvidia-smi

Thu Jun  6 10:29:28 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   53C    P0              29W /  70W |    103MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [11]:
training_path = "/content/drive/MyDrive/Explo/train"

In [12]:
validation_path = "/content/drive/MyDrive/Explo/Test_images"

In [13]:
images = os.listdir(os.path.join(training_path, "images"))

In [14]:
x = []
y = []
count = 0
for f in images[:2000]:
  img = cv2.imread(os.path.join(training_path, "images", f))
  left = list(img[:, :256])
  right = list(img[:, 256:])
  x.append(right)
  y.append(left)
  count = count + 1
  if count % 100 == 0 :
    print(count)
x = np.array(x)
y = np.array(y)

100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000


In [15]:
x.shape

(2000, 256, 256, 3)

In [16]:
y.shape

(2000, 256, 256, 3)

In [17]:
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization

In [18]:
image_shape = [256, 256, 3]

In [19]:
def define_discriminator(image_shape):

	in_src_image = Input(shape=image_shape)
	in_target_image = Input(shape=image_shape)
	merged = Concatenate()([in_src_image, in_target_image])


	d = Conv2D(64, (4,4), strides=(2,2), padding='same')(merged)
	d = LeakyReLU(alpha=0.2)(d)

	d = Conv2D(128, (4,4), strides=(2,2), padding='same')(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)

	d = Conv2D(256, (4,4), strides=(2,2), padding='same')(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)

	d = Conv2D(512, (4,4), strides=(2,2), padding='same')(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)

	d = Conv2D(512, (4,4), padding='same')(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)

	d = Conv2D(1, (4,4), padding='same')(d)
	patch_out = Activation('sigmoid')(d)

	model = Model([in_src_image, in_target_image], patch_out)


	opt = Adam(beta_1=0.5, learning_rate=0.0002)
	model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
	return model

In [20]:
temp_model = define_discriminator(image_shape)
temp_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 input_2 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 concatenate (Concatenate)   (None, 256, 256, 6)          0         ['input_1[0][0]',             
                                                                     'input_2[0][0]']             
                                                                                                  
 conv2d (Conv2D)             (None, 128, 128, 64)         6208      ['concatenate[0][0]']     

In [21]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):

	g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same')(layer_in)

	if batchnorm:
		g = BatchNormalization()(g, training=True)

	g = LeakyReLU(alpha=0.2)(g)
	return g

In [22]:
def decoder_block(layer_in, skip_in, n_filters, dropout=True):

	g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same')(layer_in)

	g = BatchNormalization()(g, training=True)

	if dropout:
		g = Dropout(0.5)(g, training=True)

	g = Concatenate()([g, skip_in])

	g = Activation('relu')(g)
	return g

In [23]:
def define_generator(image_shape=(256,256,3)):

	in_image = Input(shape=image_shape)
	e1 = define_encoder_block(in_image, 64, batchnorm=False)
	e2 = define_encoder_block(e1, 128)
	e3 = define_encoder_block(e2, 256)
	e4 = define_encoder_block(e3, 512)
	e5 = define_encoder_block(e4, 512)
	e6 = define_encoder_block(e5, 512)
	e7 = define_encoder_block(e6, 512)

	b = Conv2D(512, (4,4), strides=(2,2), padding='same')(e7)
	b = Activation('relu')(b)

	d1 = decoder_block(b, e7, 512)
	d2 = decoder_block(d1, e6, 512)
	d3 = decoder_block(d2, e5, 512)
	d4 = decoder_block(d3, e4, 512, dropout=False)
	d5 = decoder_block(d4, e3, 256, dropout=False)
	d6 = decoder_block(d5, e2, 128, dropout=False)
	d7 = decoder_block(d6, e1, 64, dropout=False)

	g = Conv2DTranspose(image_shape[2], (4,4), strides=(2,2), padding='same')(d7)
	out_image = Activation('tanh')(g)

	model = Model(in_image, out_image)
	return model

In [24]:
temp_model2 = define_generator()
temp_model2.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 conv2d_6 (Conv2D)           (None, 128, 128, 64)         3136      ['input_3[0][0]']             
                                                                                                  
 leaky_re_lu_5 (LeakyReLU)   (None, 128, 128, 64)         0         ['conv2d_6[0][0]']            
                                                                                                  
 conv2d_7 (Conv2D)           (None, 64, 64, 128)          131200    ['leaky_re_lu_5[0][0]']       
                                                                                            

In [25]:
def define_gan(g_model, d_model, image_shape):

	for layer in d_model.layers:
		if not isinstance(layer, BatchNormalization):
			layer.trainable = False

	in_src = Input(shape=image_shape)

	gen_out = g_model(in_src)

	dis_out = d_model([in_src, gen_out])

	model = Model(in_src, [dis_out, gen_out])

	opt = Adam(learning_rate=0.0002, beta_1=0.5)

	model.compile(loss=['binary_crossentropy', 'mae'],
               optimizer=opt, loss_weights=[1,100])
	return model

In [26]:
def generate_real_samples(dataset, n_samples, patch_shape):

	trainA, trainB = dataset

	ix = np.random.randint(0, trainA.shape[0], n_samples)

	X1, X2 = trainA[ix], trainB[ix]

	y = np.ones((n_samples, patch_shape, patch_shape, 1))
	return [X1, X2], y

In [27]:
def generate_fake_samples(g_model, samples, patch_shape):

	X = g_model.predict(samples)

	y = np.zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

In [28]:
def summarize_performance(step, g_model, dataset, n_samples=3):

	[X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)

	X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)

	X_realA = (X_realA + 1) / 2.0
	X_realB = (X_realB + 1) / 2.0
	X_fakeB = (X_fakeB + 1) / 2.0

	for i in range(n_samples):
		plt.subplot(3, n_samples, 1 + i)
		plt.axis('off')
		plt.imshow(X_realA[i])

	for i in range(n_samples):
		plt.subplot(3, n_samples, 1 + n_samples + i)
		plt.axis('off')
		plt.imshow(X_fakeB[i])

	for i in range(n_samples):
		plt.subplot(3, n_samples, 1 + n_samples*2 + i)
		plt.axis('off')
		plt.imshow(X_realB[i])

	filename1 = 'plot_%06d.png' % (step+1)
	plt.savefig(os.path.join("/content/drive/MyDrive/Explo", filename1))
	plt.close()

	filename2 = 'model_%06d.h5' % (step+1)
	g_model.save(os.path.join("/content/drive/MyDrive/Explo", filename2))
	print('>Saved: %s and %s' % (filename1, filename2))

In [29]:
def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):

	n_patch = d_model.output_shape[1]

	trainA, trainB = dataset

	bat_per_epo = int(len(trainA) / n_batch)

	n_steps = bat_per_epo * n_epochs

	for i in range(n_steps):

		[X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)

		X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)

		d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)

		d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)

		g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])

		print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))

		if (i+1) % (bat_per_epo * 10) == 0:
			summarize_performance(i, g_model, dataset)

In [30]:
d_model = define_discriminator(image_shape)
g_model = define_generator(image_shape)

In [31]:
gan_model = define_gan(g_model, d_model, image_shape)

In [32]:
gan_model.summary()

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_7 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 model_3 (Functional)        (None, 256, 256, 3)          5442931   ['input_7[0][0]']             
                                                          5                                       
                                                                                                  
 model_2 (Functional)        (None, 16, 16, 1)            6968257   ['input_7[0][0]',             
                                                                     'model_3[0][0]']             
                                                                                            

In [33]:
data = [x, y]

In [34]:
def preprocess_data(data):

	X1, X2 = data[0], data[1]

	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]

In [35]:
dataset = preprocess_data(data)

In [None]:
train(d_model, g_model, gan_model, dataset, n_epochs=10, n_batch=1)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
>10718, d1[0.190] d2[0.223] g[9.460]
>10719, d1[0.226] d2[0.501] g[11.802]
>10720, d1[0.096] d2[0.165] g[9.625]
>10721, d1[0.471] d2[0.104] g[11.306]
>10722, d1[0.156] d2[0.782] g[7.105]
>10723, d1[0.107] d2[0.087] g[8.356]
>10724, d1[0.745] d2[0.111] g[7.316]
>10725, d1[0.049] d2[0.129] g[9.061]
>10726, d1[0.253] d2[0.371] g[7.886]
>10727, d1[0.204] d2[0.093] g[10.150]
>10728, d1[0.220] d2[0.192] g[7.647]
>10729, d1[0.055] d2[0.180] g[10.420]
>10730, d1[0.049] d2[0.102] g[9.200]
>10731, d1[0.162] d2[0.173] g[7.266]
>10732, d1[0.277] d2[0.161] g[8.901]
>10733, d1[0.042] d2[0.214] g[9.715]
>10734, d1[0.238] d2[0.647] g[8.406]
>10735, d1[0.088] d2[0.098] g[13.830]
>10736, d1[0.129] d2[0.156] g[8.101]
>10737, d1[0.110] d2[0.098] g[9.826]
>10738, d1[0.275] d2[0.162] g[8.316]
>10739, d1[0.062] d2[0.198] g[8.140]
>10740, d1[0.046] d2[0.311] g[8.642]
>10741, d1[0.135] d2[0.143] g[8.727]
>10742, d1[0.174] d2[0.125] g[7.798]
>1074

In [None]:
loaded_model = tf.keras.models.load_model(os.path.join("/content/drive/MyDrive/Explo", "model_020000.h5"))

In [None]:
loaded_model.summary()

In [None]:
loaded_model.compile()

In [None]:
test_path = os.path.join(validation_path, "1_regular")

In [None]:
test_imgs = os.listdir(test_path)

In [None]:
test_images = []
for f in test_imgs[:100]:
  i = cv2.imread(os.path.join(test_path, f))
  test_images.append(i)
test_images = np.array(test_images)
test_images = (test_images - 127.5)/127.5

In [None]:
gen_images = loaded_model.predict(test_images)

In [None]:
gen_images = (gen_images + 1)/2.0
test_images = (test_images + 1)/2.0

In [None]:
plt.figure(figsize = (14, 6))
iterate = random.choices(range(100), k=5)
count = 0
for i in iterate:
  plt.subplot(2, 5, count + 1)
  plt.axis("off")
  plt.imshow(test_images[i])

  plt.subplot(2, 5, count + 6)
  plt.axis("off")
  plt.imshow(gen_images[i])
  count = count + 1