In [71]:
import sys
sys.path.append("..")

import os
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import importlib
import utils
importlib.reload(utils);
from utils import *
from termcolor import colored
from tensorflow.keras import layers, Model, Input

In [72]:
csv_path  = '../data/list_eval_partition.csv'
image_dir = '../data/img_align_celeba/img_align_celeba/'

# Load partition info
df = load_partition_csv(csv_path)

# Build train/val/test file lists
train_files, val_files, test_files = build_file_lists(df, image_dir)

# Create datasets
train_ds = make_image_dataset(train_files, img_size=(64,64), batch_size=64, shuffle=True).take(20)
val_ds   = make_image_dataset(val_files,   img_size=(64, 64), batch_size=64)
test_ds  = make_image_dataset(test_files,  img_size=(64, 64), batch_size=64)


## Definicja modelu


In [73]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        #batch = tf.shape(z_mean)[0]
        #dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(64, 100))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [74]:
# 1) Definicja wejścia enkodera
latent_dim = 100  # wymiar przestrzeni latentnej
print(colored("Encoder:","red"))

encoder_inputs = Input(shape=(64, 64, 3), name="encoder_input")  
# (218×178 RGB) :contentReference[oaicite:2]{index=2}

# 2) Warstwy konwolucyjne
# zamiast MaxPooling używamy Conv2D z strides=2
x = layers.Conv2D(32, kernel_size=3, strides=2, padding="same", activation="relu")(encoder_inputs)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(64, kernel_size=3, strides=2, padding="same", activation="relu")(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(128, kernel_size=3, strides=2, padding="same", activation="relu")(x)
x = layers.BatchNormalization()(x)
x = layers.Flatten()(x)  
x = layers.Dense(64, activation="relu")(x)
x = layers.Dense(128, activation="relu")(x) 
# 3) średnia, log-wariancja i próbkowanie
z_mean    = layers.Dense(latent_dim, name="z_mean")(x)      
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
# 4) Model enkodera z dwoma wyjściami
encoder = Model(encoder_inputs, [z_mean,z_log_var,z], name="encoder")
z_predict = encoder.predict(train_ds.take(1))
print(colored("Test prediction shape of 1 batch:","blue"),z_predict[0].shape)

print(colored("Decoder:","red"))
decoder_input = Input(shape=(latent_dim,), name="z_input")

# 2) Project & reshape to small feature map
x = layers.Dense(8 * 8 * 128, activation="relu")(decoder_input)
x = layers.Reshape((8, 8, 128))(x)

# 3) Upsampling blocks via transposed conv
#    Each doubles H/W and halves channels (approximately)
x = layers.Conv2DTranspose(64, kernel_size=3, strides=2, padding="same", activation="relu")(x)
x = layers.Conv2DTranspose(64,  kernel_size=3, strides=2, padding="same", activation="relu")(x)
x = layers.Conv2DTranspose(32,  kernel_size=3, strides=2, padding="same", activation="relu")(x)

    # 4) Final layer: restore to 3 channels
decoder_output = layers.Conv2DTranspose(
        3, kernel_size=3, strides=1, padding="same", activation="sigmoid", name="decoder_output")(x)

decoder = Model(decoder_input, decoder_output, name="conv_decoder")
eps = tf.random.normal(shape=(64, latent_dim))  # 64 to batch size
predict = decoder.predict(z_predict[0] + eps*tf.math.exp(z_predict[1]/2.))  # z_mean + z_log_var
print(colored("Test prediction shape of 1 batch:","blue"),predict.shape)

[31mEncoder:[0m
[34mTest prediction shape of 1 batch:[0m (64, 100)
[31mDecoder:[0m
[34mTest prediction shape of 1 batch:[0m (64, 64, 64, 3)


In [75]:
class VAE(tf.keras.Model):
    def __init__(self, encoder, decoder,beta=500, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.beta = beta

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        return z_mean,z_log_var,reconstructed
    
    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstructed = self(data)

            kl_loss = -0.5 * tf.reduce_mean(
                z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
            )
            reconstruction_loss = tf.reduce_mean(
                    tf.keras.losses.binary_crossentropy(data, reconstructed,axis=(1,2,3))
            )
            total_loss = kl_loss + self.beta * reconstruction_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {"loss":total_loss,"kl_loss": kl_loss, "reconstruction_loss": self.beta *reconstruction_loss}

In [None]:
vae = VAE(encoder, decoder)
vae.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))
history = vae.fit(train_ds, epochs=10, validation_data=val_ds, verbose=1)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x265152c6e30>

In [None]:
example = next(iter(train_ds))

output = vae(example)[2][0]

fig,axes = plt.subplots(1,2)
pil_img = tf.keras.preprocessing.image.array_to_img(example[0].numpy())
axes[0].imshow(pil_img)
pil_img = tf.keras.preprocessing.image.array_to_img(output.numpy())
axes[1].imshow(pil_img)
plt.show()

NameError: name 'train_ds' is not defined