In [1]:
from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.append('/content/drive/MyDrive/commit_test_folder/EECE491-01-Capstone-Design')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# This cell prepares the Colab environment by copying and
# extracting the dataset from Google Drive to the fast local SSD.

import os
import time

# --- Part 1: Colab Local Data Setup ---
print("Starting data setup...")
start_setup_time = time.time()

# --- Define paths ---
DRIVE_ARCHIVE_PATH = "/content/drive/MyDrive/datasets/cropped_celeba.tar"
LOCAL_ARCHIVE_PATH = "/content/cropped_celeba.tar"
EXTRACT_PATH = "/content/celeba_dataset"

# The final, correct path to the images, based on our investigation
LOCAL_DATA_DIR = os.path.join(EXTRACT_PATH, "content", "cropped_celeba")

# --- Logic ---
# Only copy/untar if the local data directory doesn't already exist
if not os.path.exists(LOCAL_DATA_DIR):
    print(f"Copying {DRIVE_ARCHIVE_PATH} to local runtime...")
    if not os.path.exists(DRIVE_ARCHIVE_PATH):
        print(f"[FATAL ERROR] Source file not found: {DRIVE_ARCHIVE_PATH}")
        raise FileNotFoundError(f"Source file not found: {DRIVE_ARCHIVE_PATH}")

    # 1. Copy the single .tar file from Drive (fast)
    !cp "{DRIVE_ARCHIVE_PATH}" "{LOCAL_ARCHIVE_PATH}"
    print("Copy complete.")

    # 2. Extract the archive to the local SSD (fast)
    print(f"Untarring {LOCAL_ARCHIVE_PATH} to {EXTRACT_PATH}...")
    !mkdir -p "{EXTRACT_PATH}"
    !tar -xf "{LOCAL_ARCHIVE_PATH}" -C "{EXTRACT_PATH}"
    print("Untar complete.")

    # 3. Clean up the local archive to save space
    !rm "{LOCAL_ARCHIVE_PATH}"
else:
    print(f"Data directory {LOCAL_DATA_DIR} already exists. Skipping copy/untar.")

print(f"Data setup finished in {time.time() - start_setup_time:.2f} seconds.")

# --- Sanity Check ---
# Crucial check to ensure data exists before proceeding
if not os.path.exists(LOCAL_DATA_DIR):
    print(f"\n[FATAL ERROR] The expected data directory does not exist: {LOCAL_DATA_DIR}")
    raise FileNotFoundError(f"Could not find data at {LOCAL_DATA_DIR}")
else:
    print(f"Successfully found data at: {LOCAL_DATA_DIR}")

Starting data setup...
Data directory /content/celeba_dataset/content/cropped_celeba already exists. Skipping copy/untar.
Data setup finished in 0.00 seconds.
Successfully found data at: /content/celeba_dataset/content/cropped_celeba


In [3]:
import torch
# Import the function from the data_utils.py file we created
from data_utils import get_dataloaders

# --- Configuration ---
# Use the local data path defined in Cell 1
DATA_ROOT = LOCAL_DATA_DIR
BATCH_SIZE = 256
IMAGE_SIZE = 128
RANDOM_SEED = 42

# --- 1. Get Dataloaders ---
# This single function call does all the work
train_loader, val_loader, test_loader = get_dataloaders(
    root_dir=DATA_ROOT,
    batch_size=BATCH_SIZE,
    image_size=IMAGE_SIZE,
    random_seed=RANDOM_SEED
)

# --- 2. Verification ---
# Final check to ensure the dataloader works
if train_loader:
    print("\nVerifying one batch from train_loader...")
    try:
        # Get one sample batch
        images, labels = next(iter(train_loader))
        print(f"  Batch loaded successfully.")
        print(f"  Image batch shape: {images.shape}")
        print("\nSetup complete. You are ready to start training.")
    except Exception as e:
        print(f"  [Error] Failed to load batch: {e}")
else:
    print("\nData loading failed. Please check previous cell output.")


Loading dataset from: /content/celeba_dataset/content/cropped_celeba
Searching for '*.jpg' files in: /content/celeba_dataset/content/cropped_celeba
Successfully found 199509 images.
Successfully loaded 199509 total images.
Splitting dataset into:
  Train: 159607 images
  Validation: 19950 images
  Test: 19952 images

DataLoaders created successfully.

Verifying one batch from train_loader...
  Batch loaded successfully.
  Image batch shape: torch.Size([256, 3, 128, 128])

Setup complete. You are ready to start training.


In [7]:
# (This is the main training/saving cell in training.ipynb)
# (This version includes the Validation loop and Best Model Saving)

import torch
import torch.nn as nn
import torch.optim as optim
import random  # (For random SNR)
import os

# --- 1. Import Modules ---
from channels import awgn_channel
from face_autoencoder import FaceAutoencoder

# (We assume train_loader and val_loader are loaded from the previous cell)

# -----------------------------------------------
# 2. Training Setup
# -----------------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = FaceAutoencoder(latent_dim=256).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# (Training parameters)
num_epochs = 20
MIN_SNR_DB = 0.0
MAX_SNR_DB = 20.0

# (Save paths)
SAVE_DIR = "/content/drive/MyDrive/models"
# (We save the "best" model based on validation loss)
MODEL_PATH = os.path.join(SAVE_DIR, "face_autoencoder_BEST.pth")
os.makedirs(SAVE_DIR, exist_ok=True)

# Add a variable to track the best validation loss
best_val_loss = float('inf') # (Initialize with infinity)

# -----------------------------------------------
# 3. Training Loop (with Validation)
# -----------------------------------------------
print("Starting robust training (Random SNR)...")
# Define fixed SNR points for robustness evaluation (5 points)
SNR_POINTS_FOR_VAL = [0.0, 5.0, 10.0, 15.0, 20.0]
NUM_VAL_POINTS = len(SNR_POINTS_FOR_VAL)

for epoch in range(num_epochs):

    # --- (A) Training Phase ---
    model.train() # Set model to TRAINING mode
    total_train_loss = 0

    for images, _ in train_loader:
        images = images.to(device)

        # 1. Encode
        latent_vector = model.encode(images)

        # 2. Channel (Apply random SNR for robust training)
        current_snr_db = random.uniform(MIN_SNR_DB, MAX_SNR_DB)
        noisy_vector = awgn_channel(latent_vector, snr_db=current_snr_db)

        # 3. Decode
        reconstructed_images = model.decode(noisy_vector)

        # 4. Loss
        loss = criterion(reconstructed_images, images)

        # 5. Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # --- (B) Validation Phase (Robustness Evaluation) ---
    model.eval() # Set model to EVALUATION mode (Important)
    total_combined_loss = 0

    # No gradients needed for validation (Saves memory/computation)
    with torch.no_grad():
        for val_images, _ in val_loader:
            val_images = val_images.to(device)

            # Loop through fixed SNR points to calculate average loss
            for fixed_snr_db in SNR_POINTS_FOR_VAL:

                # 1. Encode
                latent_vector = model.encode(val_images)

                # 2. Channel (Apply fixed SNR for consistent evaluation)
                noisy_vector = awgn_channel(latent_vector, snr_db=fixed_snr_db)

                # 3. Decode
                reconstructed_images = model.decode(noisy_vector)

                # 4. Loss
                val_loss = criterion(reconstructed_images, val_images)
                total_combined_loss += val_loss.item()

    # Calculate average validation loss across all batches and all SNR points
    avg_val_loss = total_combined_loss / (len(val_loader) * NUM_VAL_POINTS)

    # --- (C) Log and Save Best Model ---
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.6f}, Val Loss: {avg_val_loss:.6f}")

    # Save only if this epoch's Val Loss is the best one seen so far
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        print(f"  -> New best validation loss! Saving model to {MODEL_PATH}")
        # Save the state_dict of the single model
        torch.save(model.state_dict(), MODEL_PATH)

print("--- Training finished. ---")
print(f"Best validation loss achieved: {best_val_loss:.6f}")
print(f"Best model saved to {MODEL_PATH}")

Using device: cuda
Starting robust training (Random SNR)...
Epoch [1/20], Train Loss: 0.090234, Val Loss: 0.042487
  -> New best validation loss! Saving model to /content/drive/MyDrive/models/face_autoencoder_BEST.pth
Epoch [2/20], Train Loss: 0.039017, Val Loss: 0.034887
  -> New best validation loss! Saving model to /content/drive/MyDrive/models/face_autoencoder_BEST.pth
Epoch [3/20], Train Loss: 0.030226, Val Loss: 0.028640
  -> New best validation loss! Saving model to /content/drive/MyDrive/models/face_autoencoder_BEST.pth
Epoch [4/20], Train Loss: 0.028206, Val Loss: 0.026721
  -> New best validation loss! Saving model to /content/drive/MyDrive/models/face_autoencoder_BEST.pth
Epoch [5/20], Train Loss: 0.024846, Val Loss: 0.024281
  -> New best validation loss! Saving model to /content/drive/MyDrive/models/face_autoencoder_BEST.pth
Epoch [6/20], Train Loss: 0.023079, Val Loss: 0.023559
  -> New best validation loss! Saving model to /content/drive/MyDrive/models/face_autoencoder_B

In [10]:
import torch
import torch.nn as nn
import numpy as np
import os
import random
from PIL import Image

# --- 1. Module and Dataloader Setup (Assuming these are defined earlier) ---
from channels import awgn_channel
from face_autoencoder import FaceAutoencoder

# (Assume val_loader, device, MODEL_PATH, etc., are loaded)

# -----------------------------------------------
# 2. Configuration for Visualization
# -----------------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SAVE_DIR = "/content/drive/MyDrive/models"
MODEL_PATH = os.path.join(SAVE_DIR, "face_autoencoder_BEST.pth")
VISUALIZE_SNR_DB = 5.0
NUM_IMAGES_TO_SHOW = 8
OUTPUT_FOLDER = "./reconstructions" # 새로운 출력 폴더
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# -----------------------------------------------
# 3. Load the Best Model (Identical to previous code)
# -----------------------------------------------
print(f"Loading best model from: {MODEL_PATH}")
model = FaceAutoencoder(latent_dim=256).to(device)
try:
    model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
    model.eval()
    print("Model loaded successfully and set to eval mode.")
except FileNotFoundError:
    print(f"[ERROR] Model file not found at {MODEL_PATH}. Exiting.")
    exit()

# -----------------------------------------------
# 4. Get a Batch of Validation Images (Identical to previous code)
# -----------------------------------------------
try:
    data_iter = iter(val_loader)
    images, _ = next(data_iter)
    images = images.to(device)
    sample_images = images[:NUM_IMAGES_TO_SHOW]
except NameError:
    print("[ERROR] val_loader is not defined. Please run the Dataloader setup cell first. Exiting.")
    exit()
except StopIteration:
    print("[ERROR] val_loader is empty or finished. Try rerunning the Dataloader cell. Exiting.")
    exit()

# -----------------------------------------------
# 5. Process Images through the Autoencoder (Identical to previous code)
# -----------------------------------------------
with torch.no_grad():
    latent_vector_original = model.encode(sample_images)
    noisy_latent_vector = awgn_channel(latent_vector_original, snr_db=VISUALIZE_SNR_DB)
    reconstructed_noisy = model.decode(noisy_latent_vector)
    reconstructed_pristine = model.decode(latent_vector_original)

# --- NEW: Image Saving Function using PIL ---
def save_image_to_pil(tensor, filename):
    # 텐서를 NumPy 배열로 변환
    img_np = tensor.cpu().numpy()

    # 텐서 크기: (C, H, W) -> PIL 크기: (H, W, C)로 변환
    img_np = np.transpose(img_np, (1, 2, 0))

    # Tanh 출력 [-1, 1] -> 픽셀 값 [0, 255]로 변환 및 uint8 타입으로 변경
    # (x / 2 + 0.5) * 255 = (x + 1) * 127.5
    img_np = (img_np * 127.5) + 127.5
    img_np = np.clip(img_np, 0, 255).astype(np.uint8)

    # PIL Image 객체 생성 및 저장
    img_pil = Image.fromarray(img_np)
    img_pil.save(os.path.join(OUTPUT_FOLDER, filename))

print(f"\nSaving {NUM_IMAGES_TO_SHOW} image triples to '{OUTPUT_FOLDER}/'...")

# --- 6. Save the Results ---
image_list = [sample_images, reconstructed_noisy, reconstructed_pristine]
prefix_list = ["Original", f"Recon_Noisy_{VISUALIZE_SNR_DB}dB", "Recon_Pristine"]

for i in range(NUM_IMAGES_TO_SHOW):
    for j, (image_set, prefix) in enumerate(zip(image_list, prefix_list)):
        save_image_to_pil(image_set[i], f"{i:02d}_{prefix}.png")

print(f"Saving complete. Please check the '{OUTPUT_FOLDER}' folder for image files.")

# (Optional: If in Colab, you can zip the folder for easy download)
# !zip -r reconstructions.zip {OUTPUT_FOLDER}
# from google.colab import files
# files.download('reconstructions.zip')

Loading best model from: /content/drive/MyDrive/models/face_autoencoder_BEST.pth
Model loaded successfully and set to eval mode.

Saving 8 image triples to './reconstructions/'...
Saving complete. Please check the './reconstructions' folder for image files.
