In [1]:
import tensorflow as tf

# GPU 디바이스를 자동으로 인식하고 사용합니다.
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)  # GPU 메모리 동적 할당 설정

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import numpy as np

size = 32

train_data = tf.keras.utils.image_dataset_from_directory(
    "C:\\Users\\crazy\\Downloads\\archive\\img_align_celeba\\picted",
    labels=None,
    color_mode="rgb",
    image_size=(size, size),
    batch_size=128,
    shuffle=True,
    seed=42,
    interpolation="bilinear"  # 수정된 부분
)

def preprocess(img):
    img = tf.cast(img, "float32") / 255.0
    return img

train = train_data.map(lambda x: preprocess(x))

Found 20 files belonging to 1 classes.


In [3]:
# for images in train:
#     plt.imshow(images[0].numpy().astype("uint8"))

In [4]:
keras = tf.keras
K = tf.keras.backend

class Sampling(keras.layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


#인코더 부분
encoder_input = keras.layers.Input(shape=(size, size, 3), name="encoder_input")

x = keras.layers.Conv2D(128, (16, 16), strides=2, padding="same")(encoder_input)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

x = keras.layers.Conv2D(128, (8, 8), strides=2, padding="same")(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

x = keras.layers.Conv2D(128, (4, 4), strides=2, padding="same")(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

x = keras.layers.Conv2D(128, (2, 2), strides=2, padding="same")(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

shape_before_flattening = K.int_shape(x)[1:]
x = keras.layers.Flatten()(x)

z_mean = keras.layers.Dense(1, name="z_mean")(x)
z_log_var = keras.layers.Dense(1, name="z_log_var")(x)

z = Sampling()([z_mean, z_log_var])
encoder = keras.models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")

#디코더 부분
decoder_input = keras.layers.Input(shape=(1,), name="decoder_input")
x = keras.layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)
x = keras.layers.Reshape(shape_before_flattening)(x)

x = keras.layers.Conv2DTranspose(128, (2, 2), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

x = keras.layers.Conv2DTranspose(128, (4, 4), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

x = keras.layers.Conv2DTranspose(128, (8, 8), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

x = keras.layers.Conv2DTranspose(128, (16, 16), strides=2, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = tf.keras.layers.LeakyReLU(alpha=0.2)(x)

decoder_output = keras.layers.Conv2DTranspose(1, (size, size), strides=1, padding="same")(x)

decoder = keras.models.Model(decoder_input, decoder_output)


class VAE(keras.models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

        #loss 값을 모니터링 하는 요소
        self.total_loss_tracker = keras.metrics.Mean(name='total_loss')
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]
    
    def call(self, input):
        z_mean, z_log_var, z = encoder(input)
        reconstruction = decoder(z)
        return z_mean, z_log_var, reconstruction
    
    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstruction = self(data)
            reconstruction_loss = tf.reduce_mean( 500 * tf.losses.binary_crossentropy(data, reconstruction, axis=(1,2,3)))
            kl_loss = tf.reduce_mean(tf.reduce_sum( 0.6 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)), axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {m.name: m.result() for m in self.metrics}

vae = VAE(encoder, decoder)
vae.build(input_shape=(None, size, size, 3))
vae.compile(optimizer="adam")
vae.summary()

encoder.summary()
decoder.summary()

Model: "vae"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder (Functional)        [(None, 1),               1478146   
                              (None, 1),                         
                              (None, 1)]                         
                                                                 
 model (Functional)          (None, 32, 32, 1)         5707265   
                                                                 
Total params: 7,185,417
Trainable params: 7,182,339
Non-trainable params: 3,078
_________________________________________________________________
Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 32, 32, 3)]  0           []                               

In [6]:
#모델 새로 정의
vae = VAE(encoder, decoder)


#모델 훈련하기
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
vae.compile(optimizer=optimizer)
vae.fit(train, epochs=10, batch_size=10)

vae.predict([1])


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


ValueError: in user code:

    File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\engine\training.py", line 2041, in predict_function  *
        return step_function(self, iterator)
    File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\engine\training.py", line 2027, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\engine\training.py", line 2015, in run_step  **
        outputs = model.predict_step(data)
    File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\engine\training.py", line 1983, in predict_step
        return self(x, training=False)
    File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\crazy\AppData\Local\Temp\__autograph_generated_filepnzw9gfv.py", line 10, in tf__call
        (z_mean, z_log_var, z) = ag__.converted_call(ag__.ld(encoder), (ag__.ld(input),), None, fscope)

    ValueError: Exception encountered when calling layer "vae_2" "                 f"(type VAE).
    
    in user code:
    
        File "C:\Users\crazy\AppData\Local\Temp\ipykernel_15828\4215938357.py", line 89, in call  *
            z_mean, z_log_var, z = encoder(input)
        File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler  **
            raise e.with_traceback(filtered_tb) from None
        File "c:\Users\crazy\OneDrive\Desktop\Latent Diffusion\.conda\lib\site-packages\keras\engine\input_spec.py", line 250, in assert_input_compatibility
            raise ValueError(
    
        ValueError: Exception encountered when calling layer "encoder" "                 f"(type Functional).
        
        Input 0 of layer "conv2d" is incompatible with the layer: expected min_ndim=4, found ndim=1. Full shape received: (None,)
        
        Call arguments received by layer "encoder" "                 f"(type Functional):
          • inputs=tf.Tensor(shape=(None,), dtype=int32)
          • training=False
          • mask=None
    
    
    Call arguments received by layer "vae_2" "                 f"(type VAE):
      • input=tf.Tensor(shape=(None,), dtype=int32)
