In [2]:
import os
import shutil

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import HeNormal
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image

from sklearn.model_selection import train_test_split

In [None]:
IMAGE_SIZE = 448
BATCH_SIZE = 2
EPOCHS = 20
LATENT_DIM = 10

image_data_dir = 'DATA'
image_datagen = image.ImageDataGenerator(
    rescale=1./255,
    rotation_range=0,
    width_shift_range=0,
    height_shift_range=0,
    zoom_range=0,
    horizontal_flip=False,
    fill_mode='nearest',
    brightness_range=[0.9, 1.1]
)

image_generator = image_datagen.flow_from_directory(
    image_data_dir,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='input'
)

In [None]:
class VAESampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.keras.backend.shape(z_mean)[0]
        z_log_var = tf.clip_by_value(z_log_var, -20, 2)
        epsilon = tf.keras.backend.random_normal(shape=(batch, LATENT_DIM))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

def build_vae_encoder(input_shape, latent_dim):
    inputs = layers.Input(shape=input_shape)

    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same', kernel_initializer=HeNormal())(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)

    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same', kernel_initializer=HeNormal())(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)

    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same', kernel_initializer=HeNormal())(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)

    x = layers.Flatten()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.BatchNormalization()(x)

    z_mean = layers.Dense(latent_dim, name='z_mean', 
                         kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.1))(x)
    z_log_var = layers.Dense(latent_dim, name='z_log_var',
                            kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.1))(x)

    z = VAESampling()([z_mean, z_log_var])
    
    return tf.keras.Model(inputs, [z_mean, z_log_var, z], name='encoder')

def build_vae_decoder(latent_dim, original_shape):
    latent_inputs = layers.Input(shape=(latent_dim,))

    x = layers.Dense(256, activation='relu')(latent_inputs)
    x = layers.BatchNormalization()(x)
    
    x = layers.Dense((original_shape[0] // 8) * (original_shape[1] // 8) * 128, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Reshape((original_shape[0] // 8, original_shape[1] // 8, 128))(x)

    x = layers.Conv2DTranspose(128, (3, 3), activation='relu', padding='same',
                             kernel_initializer=HeNormal())(x)
    x = layers.BatchNormalization()(x)
    x = layers.UpSampling2D((2, 2))(x)

    x = layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same',
                             kernel_initializer=HeNormal())(x)
    x = layers.BatchNormalization()(x)
    x = layers.UpSampling2D((2, 2))(x)

    x = layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same',
                             kernel_initializer=HeNormal())(x)
    x = layers.BatchNormalization()(x)
    x = layers.UpSampling2D((2, 2))(x)

    outputs = layers.Conv2DTranspose(3, (3, 3), activation='sigmoid', padding='same')(x)
    
    return tf.keras.Model(latent_inputs, outputs, name='decoder')

class VAE(tf.keras.Model):
    def __init__(self, encoder, decoder, latent_dim, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.latent_dim = latent_dim
        self.total_loss_tracker = tf.keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker
        ]

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)

            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    MSE(data, reconstruction),
                    axis=[1, 2]
                )
            ) * IMAGE_SIZE

            z_log_var = tf.clip_by_value(z_log_var, -20, 2)
            kl_loss = -0.5 * tf.reduce_mean(
                tf.reduce_sum(
                    1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var),
                    axis=1
                )
            )

            total_loss = reconstruction_loss + 0.001 * kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        grads, _ = tf.clip_by_global_norm(grads, 1.0)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result()
        }

input_shape = (IMAGE_SIZE, IMAGE_SIZE, 3)
vae_encoder = build_vae_encoder(input_shape, LATENT_DIM)
vae_decoder = build_vae_decoder(LATENT_DIM, input_shape)

vae = VAE(vae_encoder, vae_decoder, LATENT_DIM)
vae.compile(optimizer=Adam(learning_rate=0.0001))

history = vae.fit(
    image_generator,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1
)

Found 30 images belonging to 1 classes.


Epoch 1/5


ValueError: Tried to convert 'x' to a tensor and failed. Error: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


In [None]:
def predict_and_visualize(vae, input_images, num_samples=5):
    input_subset = input_images[:num_samples]

    z_mean, z_log_var, z = vae.encoder(input_subset)
    print(z)

    reconstructed_images = vae.decoder(z)

    plt.figure(figsize=(12, 4))
    for i in range(num_samples):
        plt.subplot(2, num_samples, i + 1)
        plt.imshow(input_subset[i])
        plt.axis('off')
        if i == 0:
            plt.title('Original Images')

        plt.subplot(2, num_samples, num_samples + i + 1)
        plt.imshow(reconstructed_images[i])
        plt.axis('off')
        if i == 0:
            plt.title('Reconstructed Images')

    plt.tight_layout()
    plt.show()

images, _ = next(image_generator)

predict_and_visualize(vae, images, num_samples=2)

def visualize_multiple_batches(vae, generator, numbatches=3):
    for _ in range(numbatches):
        images, _ = next(generator)
        predict_and_visualize(vae, images, num_samples=2)
        plt.show()

visualize_multiple_batches(vae, image_generator)

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing import image
import numpy as np
import matplotlib.pyplot as plt

def reconstruct_image_from_path(vae, image_path):
    """
    이미지 파일 경로를 입력받아 VAE로 재구성된 이미지를 출력합니다.
    
    Parameters:
        vae: 훈련된 VAE 모델
        image_path: 이미지 파일 경로 (str)
    """
    # 이미지 로드 및 전처리
    img = image.load_img(image_path, target_size=(IMAGE_SIZE, IMAGE_SIZE))
    img_array = image.img_to_array(img)
    img_array = img_array / 255.0  # 정규화
    img_array = np.expand_dims(img_array, axis=0)  # 배치 차원 추가
    
    # VAE로 이미지 재구성
    z_mean, z_log_var, z = vae.encoder(img_array)
    reconstructed = vae.decoder(z)
    
    # 결과 시각화
    plt.figure(figsize=(10, 4))
    
    # 원본 이미지
    plt.subplot(1, 2, 1)
    plt.imshow(img_array[0])
    plt.title('Original Image')
    plt.axis('off')
    
    # 재구성된 이미지
    plt.subplot(1, 2, 2)
    plt.imshow(reconstructed[0])
    plt.title('Reconstructed Image')
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()

image_path = "path/to/your/image.jpg"
reconstruct_image_from_path(vae, image_path)