In [1]:
import numpy as np
import h5py
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import cv2
from tensorflow.keras.layers import Input,Dense,Reshape,Conv2D,Dropout,multiply,Dot,Concatenate,subtract,ZeroPadding2D
from tensorflow.keras.layers import BatchNormalization,LeakyReLU,Flatten
from tensorflow.keras.layers import Conv2DTranspose as Deconv2d
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

from keras import backend as K
import smtplib

from sklearn.utils import shuffle
import tensorflow as tf
import keras
from keras import layers
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import re
from tensorflow.keras.preprocessing.image import img_to_array
import random
from tensorflow.keras.models import load_model

In [2]:
# to get the files in proper order
def sorted_alphanumeric(data):
    import re
    convert = lambda text: int(text) if text.isdigit() else text.lower()
    alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
    return sorted(data, key=alphanum_key)

# defining the size of the image
import os
import cv2
import numpy as np
from tqdm import tqdm
from tensorflow.keras.preprocessing.image import img_to_array

SIZE = 128
MAX_IMAGES = 20000  # Giới hạn số lượng ảnh

color_img = []
path = r'/kaggle/input/augmented-dataset-cp/color'
files = os.listdir(path)
files = sorted_alphanumeric(files)[:MAX_IMAGES]  # Giới hạn số ảnh đọc vào

for i in tqdm(files):
    img = cv2.imread(os.path.join(path, i), 1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (SIZE, SIZE))
    img = img.astype('float32') / 255.0
    color_img.append(img_to_array(img))

gray_img = []
path = r'/kaggle/input/augmented-dataset-cp/gray'
files = os.listdir(path)
files = sorted_alphanumeric(files)[:MAX_IMAGES]  # Giới hạn số ảnh đọc vào

for i in tqdm(files):
    img = cv2.imread(os.path.join(path, i), 1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (SIZE, SIZE))
    img = img.astype('float32') / 255.0
    gray_img.append(img_to_array(img))


100%|██████████| 20000/20000 [02:20<00:00, 141.87it/s]
100%|██████████| 20000/20000 [02:17<00:00, 145.96it/s]


In [15]:
# Create TensorFlow datasets using from_generator
def image_generator(images):
    for img in images:
        yield img

color_dataset = tf.data.Dataset.from_generator(lambda: image_generator(color_img), output_signature=tf.TensorSpec(shape=(SIZE, SIZE, 3), dtype=tf.float32))
gray_dataset = tf.data.Dataset.from_generator(lambda: image_generator(gray_img), output_signature=tf.TensorSpec(shape=(SIZE, SIZE, 3), dtype=tf.float32))

In [3]:
def downsample(filters, size, apply_batchnorm=True):
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                                      kernel_initializer='he_normal', use_bias=not apply_batchnorm))
    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())

    result.add(tf.keras.layers.LeakyReLU())
    return result


def upsample(filters, size, apply_dropout=False):

  result = tf.keras.Sequential()
  result.add(tf.keras.layers.Conv2DTranspose(filters, size, strides=2,padding='same',kernel_initializer='he_normal',use_bias=False))
  result.add(tf.keras.layers.BatchNormalization())

  if apply_dropout:
      result.add(tf.keras.layers.Dropout(0.5))

  result.add(tf.keras.layers.ReLU())
  return result

In [4]:
import tensorflow as tf

# Giả sử các hàm downsample và upsample đã được định nghĩa
# downsample(filters, size, apply_batchnorm=True)
# upsample(filters, size, apply_dropout=False)

def Generator():
    inputs = tf.keras.layers.Input(shape=[128, 128, 3])

    # ✅ Downsampling (giảm 1 tầng và số filters để giảm tham số)
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),  # (64, 64, 64)
        downsample(128, 4),  # (32, 32, 128)
        downsample(256, 4),  # (16, 16, 256)
        downsample(384, 4),  # (8, 8, 384)  ✅ Giảm filters từ 512 -> 384
        downsample(384, 4),  # (4, 4, 384)
        downsample(512, 4),  # (2, 2, 512)  ✅ Giữ 512 ở mức thấp nhất
    ]

    # ✅ Upsampling (giảm filters ở các tầng trên)
    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (4, 4, 512)
        upsample(384, 4, apply_dropout=True),  # (8, 8, 384)  ✅ Giảm filters
        upsample(256, 4),  # (16, 16, 256)
        upsample(128, 4),  # (32, 32, 128)
        upsample(64, 4),   # (64, 64, 64)
        upsample(32, 4),   # (128, 128, 32)
    ]

    # ✅ Dùng SeparableConv2D thay cho Conv2D để giảm tham số
    last = tf.keras.layers.SeparableConv2D(3, kernel_size=3, strides=1, padding='same', activation='tanh')

    x = inputs
    skips = []

    # Downsampling với skip connections
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])  # Bỏ tầng cuối cùng

    # Upsampling với skip connections
    for up, skip in zip(up_stack[:-1], skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])

    # Tầng upsample cuối cùng không có skip connection
    x = up_stack[-1](x)

    # Tầng đầu ra
    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

# Kiểm tra mô hình
generator = Generator()
generator.summary()


In [5]:
import tensorflow as tf

def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                                      kernel_initializer=initializer, use_bias=False))
    
    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())

    result.add(tf.keras.layers.LeakyReLU())

    return result

def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)

    inp = tf.keras.layers.Input(shape=[128, 128, 3], name='input_image')
    tar = tf.keras.layers.Input(shape=[128, 128, 3], name='target_image')

    x = tf.keras.layers.Concatenate()([inp, tar])  # (bs, 128, 128, 6)

    down1 = downsample(64, 4, apply_batchnorm=False)(x)   # (bs, 64, 64, 64)
    down2 = downsample(128, 4)(down1)  # (bs, 32, 32, 128)
    down3 = downsample(256, 4)(down2)  # (bs, 16, 16, 256)
    down4 = downsample(512, 4)(down3)  # (bs, 8, 8, 512)
    down5 = downsample(512, 4)(down4)  # (bs, 4, 4, 512)
    down6 = downsample(512, 4)(down5)  # (bs, 2, 2, 512) ✅

    conv = tf.keras.layers.Conv2D(512, 4, strides=1, padding='same',
                                  kernel_initializer=initializer, use_bias=False)(down6)  # (bs, 2, 2, 512)
    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)
    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    last = tf.keras.layers.Conv2D(1, 4, strides=1, padding='same',
                                  kernel_initializer=initializer)(leaky_relu)  # (bs, 2, 2, 1) ✅

    return tf.keras.Model(inputs=[inp, tar], outputs=last)

# Test model
discriminator = Discriminator()
discriminator.summary()


In [6]:
disc = Discriminator()
sample_input = tf.random.normal([1, 128, 128, 3])
sample_target = tf.random.normal([1, 128, 128, 3])
output = disc([sample_input, sample_target])
print(f"Discriminator output shape: {output.shape}")

Discriminator output shape: (1, 2, 2, 1)


In [7]:
genLoss=[]
discLoss=[]

In [8]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)  # Giữ nguyên
discriminator_optimizer = tf.keras.optimizers.Adam(7e-5, beta_1=0.5, weight_decay=1e-4)

LAMBDA = 150

def generator_loss(disc_generated_output, gen_output, target):
  gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

  # mean absolute error
  l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

  total_gen_loss = gan_loss + (LAMBDA * l1_loss)
  genLoss.append(total_gen_loss)

  return total_gen_loss, gan_loss, l1_loss

def discriminator_loss(disc_real_output, disc_generated_output):
  real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

  generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

  total_disc_loss = real_loss + generated_loss
  discLoss.append(total_disc_loss)

  return total_disc_loss

In [9]:
def train_step(input_image, target, epoch):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)
        
        # Đảm bảo gen_output có shape giống target
        gen_output = tf.image.resize(gen_output, (128, 128))
        
        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)
        
        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_total_loss, generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    
    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))

    return gen_total_loss, gen_gan_loss, gen_l1_loss, disc_loss  # ✅ Trả về loss



In [10]:
import time

def fit(train_ds, epochs):
    history = {'gen_loss': [], 'gen_gan_loss': [], 'gen_l1_loss': [], 'disc_loss': []}

    for epoch in range(epochs):
        start = time.time()
        total_gen_loss, total_gan_loss, total_l1_loss, total_disc_loss = 0, 0, 0, 0
        num_batches = 0

        print("Epoch:", epoch+1)
        for n, (input_image, target) in train_ds.enumerate():
            gen_loss, gan_loss, l1_loss, disc_loss = train_step(input_image, target, epoch)

            total_gen_loss += gen_loss.numpy()
            total_gan_loss += gan_loss.numpy()
            total_l1_loss += l1_loss.numpy()
            total_disc_loss += disc_loss.numpy()
            num_batches += 1

        avg_gen_loss = total_gen_loss / num_batches
        avg_gan_loss = total_gan_loss / num_batches
        avg_l1_loss = total_l1_loss / num_batches
        avg_disc_loss = total_disc_loss / num_batches

        history['gen_loss'].append(avg_gen_loss)
        history['gen_gan_loss'].append(avg_gan_loss)
        history['gen_l1_loss'].append(avg_l1_loss)
        history['disc_loss'].append(avg_disc_loss)

        print(f"Epoch {epoch+1}: Gen Loss: {avg_gen_loss:.4f}, GAN Loss: {avg_gan_loss:.4f}, L1 Loss: {avg_l1_loss:.4f}, Disc Loss: {avg_disc_loss:.4f}")
        print(f"Time taken for epoch {epoch+1} is {time.time()-start:.2f} sec\n")

        # Lưu trọng số mỗi 5 epoch
        if (epoch + 1) % 5 == 0:
            generator.save(f'generator_epoch_{epoch+1}.weights.h5')
            discriminator.save(f'discriminator_epoch_{epoch+1}.weights.h5')
            print(f"Đã lưu trọng số tại Epoch {epoch+1}")

    return history


In [16]:
BATCH_SIZE = 16
train_dataset = tf.data.Dataset.zip((
    gray_dataset.batch(BATCH_SIZE),
    color_dataset.batch(BATCH_SIZE)
))#.shuffle(1000)  # Shuffle để tránh overfitting

In [17]:
for input_image, target in train_dataset.take(1):  # Lấy 1 batch đầu tiên
    print("Input shape:", input_image.shape)
    print("Target shape:", target.shape)

Input shape: (16, 128, 128, 3)
Target shape: (16, 128, 128, 3)


In [18]:
hist = fit(train_dataset, epochs=20)
hist  


Epoch: 1


KeyboardInterrupt: 