In [23]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
import tensorflow as tf
from keras.layers import Conv2D, BatchNormalization, Dropout, Input, ZeroPadding2D, LeakyReLU, Flatten, Dense, UpSampling2D,Reshape,Activation
from keras.models import Model
from keras.datasets import cifar10
import numpy as np

In [24]:
def con_block(
    x,
    filters,
    activation,
    kernel_size=(3,3),
    strides=(1,1),
    padding="same",
    use_bias=True,
    use_bn=False,
    use_dropout=False,
    drop_value=0.5
):
    x = Conv2D(
        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
    )(x)

    if use_bn:
        x = BatchNormalization(x)

    if activation:
        x = activation(x)

    if use_dropout:
        x = Dropout(drop_value)(x)

    return x

In [25]:
def build_discriminator_model(
    img_shape=(32,32,3),
    filters=64
):
    # 32x32x3
    img_input = Input(img_shape)
    
    # 16x16x64
    x = con_block(
        x=img_input,
        filters=filters,
        activation=LeakyReLU(0.2),
        kernel_size=(5,5),
        strides=(2,2),
        use_bias=True,
        use_bn=False,
        use_dropout=False,
        drop_value=0.3
    )
    
    # 8x8x128
    x = con_block(
        x=x,
        filters= 2 * filters,
        activation=LeakyReLU(0.2),
        kernel_size=(5,5),
        strides=(2,2),
        use_bias=True,
        use_bn=False,
        use_dropout=True,
        drop_value=0.3
    )
    
    # 4x4x256
    x = con_block(
        x=x,
        filters= 4 * filters,
        activation=LeakyReLU(0.2),
        kernel_size=(5,5),
        strides=(2,2),
        use_bias=True,
        use_bn=False,
        use_dropout=True,
        drop_value=0.3
    )
    
    # 2x2x512
    x = con_block(
        x=x,
        filters= 8 * filters,
        activation=LeakyReLU(0.2),
        kernel_size=(5,5),
        strides=(2,2),
        use_bias=True,
        use_bn=False,
        use_dropout=False,
        drop_value=0.3
    )
    # Flatten -> 2048
    x = Flatten()(x)
    
    # Dropout
    x = Dropout(0.2)(x)
    
    # Dense(1)
    x = Dense(1)(x)

    d_model = Model(img_input, x, name="discriminator")
    return d_model

d_model = build_discriminator_model()
d_model.summary()

In [26]:
def upsample_block(
    x,
    filters,
    activation,
    kernel_size=(3,3),
    strides=(1,1),
    up_size=(2, 2),
    padding="same",
    use_bn=False,
    use_bias=True,
    use_dropout=False,
    drop_value=0.3
):
    x = UpSampling2D(up_size)(x)
    x = Conv2D(
        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
    )(x)

    if use_bn:
        x = BatchNormalization()(x)

    if activation:
        x = activation(x)

    if use_dropout:
        x=Dropout(drop_value)(x)

    return x

In [27]:
def build_generator_model(
    noise_dim=100,
    filters=64,
):
    z_input = Input(shape=(noise_dim,))

    # Dense 2*2*512
    x = Dense(2*2*filters*8)(z_input)
    
    # Reshape 2*2*512
    x = Reshape((2, 2, filters*8))(x)
    
    # Upsample 4*4*256
    x = upsample_block(
        x=x,
        filters=filters*4,
        activation=LeakyReLU(0.2),
        kernel_size=(3,3),
        strides=(1,1),
        up_size=(2, 2),
        padding="same",
        use_bn=True,
        use_bias=False,
        use_dropout=False,
        drop_value=0.3
    )
    # Upsample 8*8*128
    x = upsample_block(
        x=x,
        filters=filters*2,
        activation=LeakyReLU(0.2),
        kernel_size=(3,3),
        strides=(1,1),
        up_size=(2, 2),
        padding="same",
        use_bn=True,
        use_bias=False,
        use_dropout=False,
        drop_value=0.3
    )
    # Upsample 16*16*64
    x = upsample_block(
        x=x,
        filters=filters,
        activation=LeakyReLU(0.2),
        kernel_size=(3,3),
        strides=(1,1),
        up_size=(2, 2),
        padding="same",
        use_bn=True,
        use_bias=False,
        use_dropout=False,
        drop_value=0.3
    )
    
    # Upsample 32*32*3 (img_fake)
    x = upsample_block(
        x=x,
        filters=3,
        activation=Activation("tanh"),
        kernel_size=(3,3),
        strides=(1,1),
        up_size=(2, 2),
        padding="same",
        use_bn=True,
        use_bias=False,
        use_dropout=False,
        drop_value=0.3
    )

    g_model = Model(z_input, x, name="generator")
    return g_model

g_model = build_generator_model()
g_model.summary()
    

In [28]:
class WGAN_GP(Model):
    def __init__(
        self,
        discriminator,
        generator,
        noise_dim,
        discriminator_extra_steps=3,
        gp_weight=10.0
    ):
        super().__init__()  # Sửa lại cách gọi super()
        self.discriminator = discriminator
        self.generator = generator
        self.noise_dim = noise_dim
        self.d_steps = discriminator_extra_steps
        self.gp_weight = gp_weight

    def compile(self, d_optimizer, g_optimizer):
        super().compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        
    def gradient_penaty(self, batch_size, real_images, fake_images):
        # Sinh ra các số khác nhau cho các hình ảnh trong batch
        alpha = tf.random.uniform([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.discriminator(interpolated, training=True)

        # Tính toán gradient cho interpolated image
        grads = gp_tape.gradient(pred, [interpolated])[0]

        # Tính toán norm
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]

        batch_size = tf.shape(real_images)[0]

        # Cập nhật discriminator d_steps lần
        for i in range(self.d_steps):
            # Tạo 1 batch nhiễu làm đầu vào 
            noise = tf.random.normal(
                shape=(batch_size, self.noise_dim)
            )
            with tf.GradientTape() as tape:
                # Tạo ảnh fake với nhiễu bằng generator
                fake_img = self.generator(noise, training=True)
                # Đánh giá điểm của ảnh fake với discriminator
                score_img_fake = self.discriminator(fake_img, training=True)
                # Đánh giá điểm của ảnh real với discriminator
                score_img_real = self.discriminator(real_images, training=True)
                # Tính điểm trung bình của 1 lô ảnh fake
                loss_fake = tf.reduce_mean(score_img_fake)
                # Tính điểm trung bình của 1 lô ảnh real
                loss_real = tf.reduce_mean(score_img_real)  # Sửa chính tả từ "scorce_img_real"
                # Tính gradient penalty
                gp = self.gradient_penaty(batch_size, real_images, fake_img)  # Sửa "reai_images" thành "real_images"
                # Tính loss function của discriminator
                d_loss = loss_fake - loss_real + gp * self.gp_weight
                
            # Tính toán đạo hàm loss function tại điểm hiện tại
            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
            # Cập nhật params của discriminator tại điểm hiện tại bằng d_optimizer
            self.d_optimizer.apply_gradients(
                zip(d_gradient, self.discriminator.trainable_variables)
            )
            
        # Cập nhật generator
        noise = tf.random.normal(
            shape=(batch_size, self.noise_dim)
        )
        with tf.GradientTape() as tape:
            # Tạo ảnh fake bằng generator 
            fake_img = self.generator(noise, training=True)
            # Đánh giá điểm của ảnh fake bằng discriminator
            score_fake_img = self.discriminator(fake_img, training=True)
            # Tính loss function của generator
            g_loss = - tf.reduce_mean(score_fake_img)
        # Tính đạo hàm loss functinon của generator tại điểm hiện tại 
        g_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        # Cập nhật params của generator bằng g_optimizer
        self.g_optimizer.apply_gradients(
            zip(g_gradient, self.generator.trainable_variables)
        )
        return {"d_loss": d_loss, "g_loss": g_loss}


In [29]:
class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img=6, latent_dim=128):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images = (generated_images * 127.5) + 127.5

        for i in range(self.num_img):
            img = generated_images[i].numpy()
            img = keras.utils.array_to_img(img)
            img.save("generated_img_{i}_{epoch}.png".format(i=i, epoch=epoch))

In [30]:
def get_data(data_name):
    (X_train, _), (_, _) = data_name.load_data()
    X_train = X_train.astype(np.float32)
    X_train = 2*(X_train/255.0) - 1.0
    return X_train

In [None]:
generator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
discriminator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)

# Set the number of epochs for training.
epochs = 20
noise_dim = 100
BATCH_SIZE=128

# Instantiate the customer `GANMonitor` Keras callback.
cbk = GANMonitor(num_img=3, latent_dim=noise_dim)

# Get the wgan model
wgan = WGAN_GP(
    discriminator=d_model,
    generator=g_model,
    noise_dim=noise_dim,
    discriminator_extra_steps=3,
    gp_weight=10.0
)



# Compile the wgan model
wgan.compile(
    d_optimizer=discriminator_optimizer,
    g_optimizer=generator_optimizer,
)

X_train = get_data(cifar10)
# Start training
wgan.fit(X_train, batch_size=BATCH_SIZE, epochs=epochs, callbacks=[cbk])

Epoch 1/20


I0000 00:00:1729767729.911441   86556 service.cc:146] XLA service 0x7f2c54016820 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1729767729.911752   86556 service.cc:154]   StreamExecutor device (0): NVIDIA GeForce RTX 4060 Laptop GPU, Compute Capability 8.9
2024-10-24 11:02:10.238192: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-10-24 11:02:11.093665: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:531] Loaded cuDNN version 8907

2024-10-24 11:02:29.471134: E external/local_xla/xla/service/slow_operation_alarm.cc:65] Trying algorithm eng34{k2=2,k4=0,k5=0,k6=0,k7=0,k19=0} for conv (f32[256,128,4,4]{3,2,1,0}, u8[0]{0}) custom-call(f32[256,512,6,6]{3,2,1,0}, f32[128,512,3,3]{3,2,1,0}), window={size=3x3 rhs_reversal=1x1}, dim_labels=bf01_oi01->bf01, custom_call_target="__cudnn$convForward", backend_config={"opera

[1m389/391[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 109ms/step - d_loss: -4.4066 - g_loss: 13.1733

2024-10-24 11:03:19.570443: E external/local_xla/xla/service/slow_operation_alarm.cc:65] Trying algorithm eng28{k2=1,k3=0} for conv (f32[80,64,16,16]{3,2,1,0}, u8[0]{0}) custom-call(f32[80,3,35,35]{3,2,1,0}, f32[64,3,5,5]{3,2,1,0}), window={size=5x5 stride=2x2}, dim_labels=bf01_oi01->bf01, custom_call_target="__cudnn$convForward", backend_config={"operation_queue_id":"0","wait_on_operation_queues":[],"cudnn_conv_backend_config":{"conv_result_scale":1,"activation_mode":"kNone","side_input_scale":0,"leakyrelu_alpha":0},"force_earliest_schedule":false} is taking a while...
2024-10-24 11:03:19.571047: E external/local_xla/xla/service/slow_operation_alarm.cc:133] The operation took 13.013270643s
Trying algorithm eng28{k2=1,k3=0} for conv (f32[80,64,16,16]{3,2,1,0}, u8[0]{0}) custom-call(f32[80,3,35,35]{3,2,1,0}, f32[64,3,5,5]{3,2,1,0}), window={size=5x5 stride=2x2}, dim_labels=bf01_oi01->bf01, custom_call_target="__cudnn$convForward", backend_config={"operation_queue_id":"0","wait_on_operat

[1m391/391[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 122ms/step - d_loss: -4.4128 - g_loss: 13.1461

W0000 00:00:1729767800.282871   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.309137   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.310406   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.311713   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.313108   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.314227   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.315218   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.316068   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.316891   84241 gp

[1m391/391[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m 123ms/step - d_loss: -4.4164 - g_loss: 13.1094
Epoch 2/20


W0000 00:00:1729767800.484723   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.485656   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.486643   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.488849   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.503795   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.504567   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.505411   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.506157   84241 gpu_timer.cc:114] Skipping the delay kernel, measurement accuracy will be reduced
W0000 00:00:1729767800.506940   84241 gp

[1m264/391[0m [32m━━━━━━━━━━━━━[0m[37m━━━━━━━[0m [1m21s[0m 167ms/step - d_loss: -2.7359 - g_loss: 6.1938

In [None]:
from IPython.display import Image, display

display(Image("generated_img_0_19.png"))
display(Image("generated_img_1_19.png"))
display(Image("generated_img_2_19.png"))