In [4]:
import os
import numpy as np
import random
import cv2
from tensorflow.keras.utils import Sequence
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import matplotlib.pyplot as plt
from tqdm import tqdm  # For progress bars


In [5]:
# --- Data Generator Class ---
class DataGenerator(Sequence):
    """
    Keras Data Generator for loading batches of .npz data efficiently.
    This avoids loading all data at once into memory and processes batches on the fly.
    """
    def __init__(self, directory, batch_size=32, file_sample_ratio=0.1, sample_ratio=0.1, img_size=(256, 256), shuffle=True):
        super().__init__()  # Call the superclass constructor
        self.directory = directory
        self.batch_size = batch_size
        self.file_sample_ratio = file_sample_ratio
        self.sample_ratio = sample_ratio
        self.img_size = img_size
        self.shuffle = shuffle
        self.npz_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.npz')]
        
        # Sampling a subset of files (for large datasets)
        num_files = len(self.npz_files)
        self.sampled_files = random.sample(self.npz_files, int(self.file_sample_ratio * num_files))

        self.filenames = [os.path.basename(f).split('.')[0] for f in self.sampled_files]

        if self.shuffle:
            random.shuffle(self.sampled_files)

        self.on_epoch_end()

    def __len__(self):
        """Denotes the number of batches per epoch"""
        return int(np.floor(len(self.sampled_files) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data"""
        batch_files = self.sampled_files[index * self.batch_size:(index + 1) * self.batch_size]

        images, bboxes, landmarks_2d, landmarks_3d = [], [], [], []
        for npz_file in batch_files:
            data = np.load(npz_file)
            color_images = data['colorImages']
            bboxes_data = data['boundingBox']
            landmarks2D_data = data['landmarks2D']
            landmarks3D_data = data['landmarks3D']

            # Sampling a fraction of the frames
            num_frames = color_images.shape[-1]
            sampled_indices = random.sample(range(num_frames), int(self.sample_ratio * num_frames))

            for idx in sampled_indices:
                img = color_images[..., idx]
                img = cv2.resize(img, self.img_size)  # Resize to the expected input size
                img = img / 255.0  # Normalize the image

                images.append(img)
                bboxes.append(bboxes_data[..., idx])
                landmarks_2d.append(landmarks2D_data[..., idx])
                landmarks_3d.append(landmarks3D_data[..., idx])

        # Convert to numpy arrays
        images = np.array(images)
        bboxes = np.array(bboxes)
        landmarks_2d = np.array(landmarks_2d)
        landmarks_3d = np.array(landmarks_3d)

        return images, images  # For autoencoder, input and output are the same

    def on_epoch_end(self):
        """Shuffle the dataset at the end of each epoch if required"""
        if self.shuffle:
            random.shuffle(self.sampled_files)



In [6]:

# --- Preprocess Function ---
def preprocess_images(images):
    """
    Resize and normalize the images for input into the model.
    """
    processed_images = []
    for img_set in images:
        for img in img_set.transpose(2, 0, 1):  # Loop over each frame
            img = cv2.resize(img, (256, 256))  # Resize images to 256x256
            img = img / 255.0  # Normalize pixel values to [0, 1]
            processed_images.append(img)
    return np.array(processed_images)



In [7]:

# --- Autoencoder Model ---
def build_autoencoder():
    """
    Build a Convolutional Autoencoder for image reconstruction.
    The model structure includes convolutional layers followed by max pooling and upsampling layers.
    """
    input_img = keras.Input(shape=(256, 256, 3))  # Assuming input images are 256x256 RGB

    # Encoder
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(input_img)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

    # Decoder
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(encoded)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)  # Output is a 3-channel RGB image

    # Autoencoder model
    autoencoder = keras.Model(input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='mse')
    autoencoder.summary()

    return autoencoder



In [8]:

# --- Visualization Function ---
def display_comparison(original, reconstructed, names, n=5):
    """
    Display a side-by-side comparison of original and reconstructed images,
    including labels for the person's name.
    """
    plt.figure(figsize=(15, 5))
    for i in range(n):
        # Original images
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(original[i])
        plt.title(f"Original: {names[i]}")
        plt.axis("off")

        # Reconstructed images
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(reconstructed[i])
        plt.title("Reconstructed")
        plt.axis("off")
    plt.show()


In [15]:

# --- Save Comparisons Function ---
def save_comparisons(original, enhanced, names, folder="../output/t1"):
    """
    Save side-by-side comparisons of original and enhanced images to the specified folder,
    using the person's name as the filename.
    """
    os.makedirs(folder, exist_ok=True)
    for i in range(len(original)):
        original_img = (original[i] * 255).astype(np.uint8)
        enhanced_img = (enhanced[i] * 255).astype(np.uint8)
        combined = np.hstack((original_img, enhanced_img))  # Side by side comparison
        cv2.imwrite(os.path.join(folder, f"{names[i]}_comparison.keras.png"), combined)


In [16]:

# # connect drive
# from google.colab import drive
# drive.mount('/content/drive')


In [17]:

# --- Main Execution ---
# Example usage with a directory
dataset_dir = '../dataset'  # Change to your dataset path
train_generator = DataGenerator(directory=dataset_dir, batch_size=16, file_sample_ratio=0.05, sample_ratio=0.05, img_size=(256, 256), shuffle=True)


In [18]:

# Build the autoencoder model
autoencoder = build_autoencoder()


In [19]:

# Test generator to ensure it's working correctly
test_images, _ = train_generator[0]  # Fetch the first batch
print(f"Loaded {test_images.shape[0]} images from the generator.")


Loaded 111 images from the generator.


In [20]:

# Callbacks for saving the best model and early stopping
model_checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_loss', mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model using the generator with progress bar
for epoch in tqdm(range(10), desc="Training Progress"):  # Start with fewer epochs
    autoencoder.fit(
        train_generator,
        validation_data=train_generator,
        epochs=1,
        callbacks=[model_checkpoint, early_stopping]
    )


Training Progress:   0%|          | 0/10 [00:00<?, ?it/s]

[1m1/6[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m4:07[0m 50s/step - loss: 0.0877

In [None]:

# Save the model after training
autoencoder.save("../models/t1/wo_final_model.keras")


In [None]:

# Generate enhanced images from the validation set
original_images, _ = train_generator[0]  # Fetch the first batch of original images
reconstructed_images = autoencoder.predict(original_images[:5])  # Predicting on first 5 samples

# Prepare names based on the dataset
names = train_generator.filenames[:5]  # Use the first 5 filenames for display or evaluation

# Display a comparison between original and enhanced images
display_comparison(original_images[:5], reconstructed_images, names, n=5)


In [None]:
# Save comparisons for evaluation using person's name as filename
save_comparisons(original_images, reconstructed_images, names)