In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import numpy as np
import os
import time

In [2]:
class SelfAttention(layers.Layer):
    def __init__(self, channels):
        super(SelfAttention, self).__init__()
        self.channels = channels  # 입력 채널 수를 저장

        self.theta = layers.Conv2D(channels // 8, kernel_size=1, padding="same")  # theta 변환 레이어, channel을 1/8로 줄임
        self.phi = layers.Conv2D(channels // 8, kernel_size=1, padding="same")    # phi 변환 레이어, channel을 1/8로 줄임
        self.g = layers.Conv2D(channels // 2, kernel_size=1, padding="same")      # g 변환 레이어, channel을 1/2로 줄임
        self.o = layers.Conv2D(channels, kernel_size=1, padding="same")           # 출력 변환 레이어, channel을 원래대로 복원
        self.gamma = tf.Variable(0.0, trainable=True)                             # Self-Attention 결과의 residual 가중치

    def call(self, x):
        batch_size, height, width, channels = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]  # 입력 x의 shape 정보 추출

        theta = self.theta(x)  # theta 변환, channel을 1/8로 줄인 특성 맵 (batch_size, height, width, channels // 8)
        phi = self.phi(x)      # phi 변환, channel을 1/8로 줄인 특성 맵 (batch_size, height, width, channels // 8)
        g = self.g(x)          # g 변환, channel을 1/2로 줄인 특성 맵 (batch_size, height, width, channels // 2)

        theta = tf.reshape(theta, [batch_size, -1, channels // 8])  # theta를 (batch_size, height * width, channels // 8)로 재배열
        phi = tf.reshape(phi, [batch_size, -1, channels // 8])      # phi를 (batch_size, height * width, channels // 8)로 재배열
        g = tf.reshape(g, [batch_size, -1, channels // 2])          # g를 (batch_size, height * width, channels // 2)로 재배열

        attention = tf.nn.softmax(tf.matmul(theta, phi, transpose_b=True))  # theta와 phi의 행렬 곱 후 softmax로 정규화하여 attention map 생성
        o = tf.matmul(attention, g)  # attention map과 g를 곱하여 Self-Attention 결과 계산 (batch_size, height * width, channels // 2)
        o = tf.reshape(o, [batch_size, height, width, channels // 2])  # Self-Attention 결과를 (batch_size, height, width, channels // 2)로 재구성

        o = self.o(o)  # channel을 원래 channel 수로 복원하기 위해 Conv2D 레이어 적용
        return x + self.gamma * o  # 입력 x에 gamma 가중치를 적용한 Self-Attention 결과를 더하여 residual 연결

In [3]:
class SpectralNorm(layers.Layer):  # Spectral Normalization을 적용하기 위한 래퍼 클래스
    def __init__(self, layer):     # 생성자, 정규화를 적용할 레이어를 인자로 받음
        super(SpectralNorm, self).__init__()
        self.layer = layer         # SpectralNorm이 적용될 레이어를 저장

    def build(self, input_shape):  # 레이어를 빌드하고 가중치 초기화를 위한 build 메서드
        if not self.layer.built:   # 레이어가 빌드되지 않은 경우에만 빌드 수행
            self.layer.build(input_shape)  # 지정된 input_shape에 맞춰 레이어 빌드

    def call(self, inputs):  # 가중치 정규화를 적용하고, 레이어의 출력을 반환하는 call 메서드
        weight = self.layer.kernel  # 레이어의 가중치 행렬(kernel)을 가져옴
        sigma = tf.linalg.norm(tf.reshape(weight, [-1, weight.shape[-1]]), ord=2)  # 가중치 행렬의 스펙트럼 노름 계산
        self.layer.kernel.assign(weight / sigma)  # 가중치를 스펙트럼 노름으로 정규화
        return self.layer(inputs)  # 정규화된 가중치로 레이어를 호출하여 결과 반환

In [4]:
def generator(latent_dim):  # Generator 네트워크 정의 함수, latent_dim은 잠재 공간의 차원
    inputs = layers.Input(shape=(latent_dim,))  # 잠재 벡터를 입력으로 받는 Input 레이어 생성
    x = layers.Dense(4 * 4 * 256, use_bias=False)(inputs)  # Dense 레이어로 입력을 4x4x256 크기로 변환 (4x4 크기로 시작)
    x = layers.BatchNormalization()(x)  # Batch Normalization 적용, 네트워크의 학습 안정성 향상
    x = layers.ReLU()(x)  # ReLU 활성화 함수 적용, 비선형성 추가
    x = layers.Reshape((4, 4, 256))(x)  # 4x4x256 텐서 형태로 변환하여 2D Conv 레이어와 호환되도록 함

    x = layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding="same")(x)  # 4x4x256 → 8x8x128로 확장
    x = layers.BatchNormalization()(x)  # Batch Normalization 적용하여 안정적인 학습을 지원
    x = layers.ReLU()(x)  # ReLU 활성화 함수로 비선형성 추가

    x = layers.Conv2DTranspose(64, kernel_size=4, strides=2, padding="same")(x)  # 8x8x128 → 16x16x64로 확장
    x = layers.BatchNormalization()(x)  # Batch Normalization 적용
    x = layers.ReLU()(x)  # ReLU 활성화 함수 적용

    x = SelfAttention(64)(x)  # Self-Attention 레이어 추가, 16x16x64 특징 맵의 전역적 상관관계 학습

    x = layers.Conv2DTranspose(32, kernel_size=4, strides=2, padding="same")(x)  # 16x16x64 → 32x32x32로 확장
    x = layers.BatchNormalization()(x)  # Batch Normalization 적용
    x = layers.ReLU()(x)  # ReLU 활성화 함수 적용

    outputs = layers.Conv2DTranspose(3, kernel_size=4, strides=1, padding="same", activation="tanh")(x)  # 32x32x32 → 32x32x3 출력 생성, tanh 활성화 적용
    model = tf.keras.Model(inputs, outputs)  # Generator 모델 생성
    return model  # 완성된 Generator 모델 반환

In [5]:
def discriminator():  # Discriminator 네트워크 정의 함수, 32x32x3 이미지를 입력으로 받음
    inputs = layers.Input(shape=(32, 32, 3))  # 32x32 크기의 RGB 이미지 입력
    x = SpectralNorm(layers.Conv2D(32, kernel_size=4, strides=2, padding="same"))(inputs)  # 32x32x3 → 16x16x32, Spectral Normalization 적용된 Conv2D
    x = layers.LeakyReLU(0.2)(x)  # LeakyReLU 활성화 함수 적용 (alpha=0.2) 

    x = SpectralNorm(layers.Conv2D(64, kernel_size=4, strides=2, padding="same"))(x)  # 16x16x32 → 8x8x64, 채널 수 증가
    x = layers.LeakyReLU(0.2)(x)  # LeakyReLU 활성화 함수 적용

    x = SpectralNorm(layers.Conv2D(128, kernel_size=4, strides=2, padding="same"))(x)  # 8x8x64 → 4x4x128, 채널 수 증가
    x = layers.LeakyReLU(0.2)(x)  # LeakyReLU 활성화 함수 적용

    x = SelfAttention(128)(x)  # Self-Attention 레이어 추가, 4x4x128 특징 맵의 전역적 상관관계 학습

    x = SpectralNorm(layers.Conv2D(256, kernel_size=4, strides=2, padding="same"))(x)  # 4x4x128 → 2x2x256, 채널 수 증가
    x = layers.LeakyReLU(0.2)(x)  # LeakyReLU 활성화 함수 적용

    x = layers.Flatten()(x)  # 텐서를 1차원으로 변환하여 Dense 레이어에 입력할 수 있도록 준비
    outputs = SpectralNorm(layers.Dense(1))(x)  # 최종 출력 레이어, Spectral Normalization 적용된 Dense 레이어, 스칼라 값 출력
    model = tf.keras.Model(inputs, outputs)  # Discriminator 모델 생성
    return model  # 완성된 Discriminator 모델 반환

In [6]:
# Discriminator Loss 함수 (이론적 정의에 따른 구현)
def discriminator_loss(real_output, fake_output):  # Discriminator가 진짜와 가짜 이미지를 구별하기 위한 손실 함수
    real_loss = tf.reduce_mean(tf.minimum(0.0, -1 + real_output))  # 진짜 이미지에 대한 손실, 점수를 1에 가깝게 만들도록 유도
    fake_loss = tf.reduce_mean(tf.minimum(0.0, -1 - fake_output))  # 가짜 이미지에 대한 손실, 점수를 -1에 가깝게 만들도록 유도
    return -(real_loss + fake_loss)  # 두 손실 값을 합하여 최종 Discriminator 손실 계산

# Generator Loss 함수 (이론적 정의와 일치)
def generator_loss(fake_output):  # Generator가 생성한 이미지가 Discriminator로부터 높은 점수를 받도록 유도하는 손실 함수
    return -tf.reduce_mean(fake_output)  # Discriminator의 출력을 최대화하여 진짜처럼 보이도록 유도

# Gradient Penalty를 위한 함수
def gradient_penalty(discriminator, real_images, fake_images):  # Gradient Penalty를 계산하여 Discriminator의 안정성을 높임
    real_images = tf.cast(real_images, tf.float32)  # real_images를 float32로 변환
    fake_images = tf.cast(fake_images, tf.float32)  # fake_images를 float32로 변환
    
    batch_size = tf.shape(real_images)[0]  # 배치 크기를 가져옴
    epsilon = tf.random.uniform([batch_size, 1, 1, 1], 0.0, 1.0, dtype=tf.float32)  # [0, 1] 사이의 무작위 값을 생성하여 배치 크기에 맞게 만듦
    interpolated = epsilon * real_images + (1 - epsilon) * fake_images  # real_images와 fake_images 사이에서 보간한 텐서 생성

    with tf.GradientTape() as gp_tape:  # Gradient Penalty 계산을 위한 테이프 설정
        gp_tape.watch(interpolated)  # 보간된 이미지에 대한 기울기 계산
        interpolated_output = discriminator(interpolated, training=True)  # Discriminator로 보간 이미지를 전달하여 출력 계산

    grads = gp_tape.gradient(interpolated_output, [interpolated])[0]  # 보간된 이미지에 대한 기울기(Gradient) 계산
    grads_norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))  # 기울기의 L2 노름 계산 (Gradient Norm)
    gp = tf.reduce_mean((grads_norm - 1.0) ** 2)  # Gradient Penalty 계산, 노름이 1에서 멀어질수록 큰 값
    return gp  # 최종 Gradient Penalty 값 반환

In [7]:
# Training function with TTUR 적용
def train_sagan(generator, discriminator, dataset, latent_dim, epochs, save_interval):
    # TTUR 적용: Generator와 Discriminator의 학습률을 다르게 설정
    generator_optimizer = tf.keras.optimizers.Adam(5e-5, beta_1=0.0, beta_2=0.9)  # Generator 학습률을 낮게 설정
    discriminator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.0, beta_2=0.9)  # Discriminator 학습률을 높게 설정

    gen_losses = []  # 각 에포크의 Generator 손실을 저장하는 리스트
    disc_losses = []  # 각 에포크의 Discriminator 손실을 저장하는 리스트

    if not os.path.exists("generated_images"):  # 생성된 이미지 저장 폴더가 없으면 생성
        os.makedirs("generated_images")

    for epoch in range(epochs):  # 지정된 에포크 수만큼 학습
        start_time = time.time()  # 에포크 시작 시간 기록

        epoch_gen_loss = 0  # 해당 에포크 동안의 Generator 손실 누적 변수
        epoch_disc_loss = 0  # 해당 에포크 동안의 Discriminator 손실 누적 변수
        for real_images in dataset:  # 데이터셋에서 실제 이미지를 가져옴
            noise = tf.random.normal([real_images.shape[0], latent_dim])  # 배치 크기와 잠재 공간 크기에 맞는 랜덤 노이즈 생성

            # Train Discriminator
            with tf.GradientTape() as disc_tape:  # Discriminator의 손실에 대한 그래디언트를 기록하는 테이프
                fake_images = generator(noise, training=True)  # Generator로 가짜 이미지 생성
                real_output = discriminator(real_images, training=True)  # Discriminator로 실제 이미지 평가
                fake_output = discriminator(fake_images, training=True)  # Discriminator로 가짜 이미지 평가
                # Discriminator 손실 계산 (Hinge Loss + Gradient Penalty)
                d_loss = discriminator_loss(real_output, fake_output) + 10.0 * gradient_penalty(discriminator, real_images, fake_images)
            # Discriminator의 가중치 업데이트
            gradients_of_discriminator = disc_tape.gradient(d_loss, discriminator.trainable_variables)
            discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

            # Train Generator
            with tf.GradientTape() as gen_tape:  # Generator의 손실에 대한 그래디언트를 기록하는 테이프
                fake_images = generator(noise, training=True)  # Generator로 새로운 가짜 이미지 생성
                fake_output = discriminator(fake_images, training=True)  # 생성된 가짜 이미지를 Discriminator로 평가
                g_loss = generator_loss(fake_output)  # Generator 손실 계산 (Discriminator로부터 높은 점수를 받도록 유도)
            # Generator의 가중치 업데이트
            gradients_of_generator = gen_tape.gradient(g_loss, generator.trainable_variables)
            generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))

            # 에포크 동안의 손실 누적
            epoch_gen_loss += g_loss.numpy()
            epoch_disc_loss += d_loss.numpy()

        # 에포크 손실 및 경과 시간 계산
        epoch_gen_loss /= len(dataset)  # 해당 에포크 동안의 평균 Generator 손실
        epoch_disc_loss /= len(dataset)  # 해당 에포크 동안의 평균 Discriminator 손실
        end_time = time.time()  # 에포크 종료 시간 기록
        epoch_time = end_time - start_time  # 해당 에포크에 소요된 시간

        gen_losses.append(epoch_gen_loss)  # Generator 손실을 리스트에 추가
        disc_losses.append(epoch_disc_loss)  # Discriminator 손실을 리스트에 추가

        # 현재 에포크 정보 출력
        print(f"Epoch {epoch + 1}/{epochs}, Gen Loss: {epoch_gen_loss:.4f}, Disc Loss: {epoch_disc_loss:.4f}, Time: {epoch_time:.2f}s")

        # 지정된 간격마다 Generator가 생성한 이미지를 저장
        if (epoch + 1) % save_interval == 0:
            generated_images = generator(noise, training=False)  # 학습 모드가 아닌 상태에서 이미지를 생성
            save_images(generated_images, epoch + 1)  # 이미지를 저장하는 함수 호출

    return gen_losses, disc_losses  # 모든 에포크의 Generator 및 Discriminator 손실을 반환

In [8]:
def save_images(images, epoch):
    images = (images + 1) * 127.5  # Rescale images to [0, 255]
    images = images.numpy().astype(np.uint8)

    fig, axs = plt.subplots(4, 4, figsize=(4, 4))
    for i in range(4):
        for j in range(4):
            axs[i, j].imshow(images[i * 4 + j])
            axs[i, j].axis("off")
    plt.savefig(f"generated_images/generated_epoch_{epoch}.png")
    plt.close()

In [9]:
batch_size = 128  
latent_dim = 100
epochs = 10000
save_interval = 500

In [10]:
(train_images, _), (_, _) = tf.keras.datasets.cifar10.load_data()
train_images = (train_images - 127.5) / 127.5
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(50000).batch(batch_size)

In [None]:
generator = generator(latent_dim)
discriminator = discriminator()
gen_losses, disc_losses = train_sagan(generator, discriminator, train_dataset, latent_dim, epochs, save_interval)

Epoch 1/10000, Gen Loss: -0.0822, Disc Loss: 11.2280, Time: 92.17s
Epoch 2/10000, Gen Loss: -0.1096, Disc Loss: 9.8146, Time: 142.00s
Epoch 3/10000, Gen Loss: 0.1890, Disc Loss: 11.4965, Time: 142.02s
Epoch 4/10000, Gen Loss: -0.0920, Disc Loss: 11.0946, Time: 86.64s
Epoch 5/10000, Gen Loss: -0.2449, Disc Loss: 10.0541, Time: 142.02s
Epoch 6/10000, Gen Loss: -0.2932, Disc Loss: 9.0792, Time: 142.01s
Epoch 7/10000, Gen Loss: -0.2428, Disc Loss: 8.0306, Time: 142.03s
Epoch 8/10000, Gen Loss: 0.0023, Disc Loss: 6.7817, Time: 141.99s
Epoch 9/10000, Gen Loss: 0.0590, Disc Loss: 5.7261, Time: 85.40s
Epoch 10/10000, Gen Loss: 0.6176, Disc Loss: 3.4923, Time: 142.03s
Epoch 11/10000, Gen Loss: 0.7012, Disc Loss: 3.8008, Time: 84.97s
Epoch 12/10000, Gen Loss: 0.6798, Disc Loss: 3.3757, Time: 85.08s
Epoch 13/10000, Gen Loss: 0.7390, Disc Loss: 3.0527, Time: 142.01s
Epoch 14/10000, Gen Loss: 0.7819, Disc Loss: 2.7866, Time: 85.41s
Epoch 15/10000, Gen Loss: 0.7571, Disc Loss: 2.7507, Time: 142.01s


In [None]:
# Plot Generator and Discriminator Losses
plt.figure()
plt.plot(gen_losses, label="Generator Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Generator Loss over Epochs")
plt.legend()
plt.show()

In [None]:
plt.figure()
plt.plot(disc_losses, label="Discriminator Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Discriminator Loss over Epochs")
plt.legend()
plt.show()