## Creating sad folder


In [None]:
from deepface import DeepFace
import os
import shutil
from collections import defaultdict

# Path to the folder containing images
folder_path = r""

# Path to the new folder for sad images
output_folder_sad = os.path.join(folder_path, "sad1")

# Path to the new folder for non-sad images (EXTRAS)
output_folder_extras = r""

# Create the output folders if they don't exist
if not os.path.exists(output_folder_sad):
    os.makedirs(output_folder_sad)

if not os.path.exists(output_folder_extras):
    os.makedirs(output_folder_extras)

# Dictionary to store emotion counts
emotion_count = defaultdict(int)

# Iterate through all images in the folder
for filename in os.listdir(folder_path):
    # Construct the full file path
    file_path = os.path.join(folder_path, filename)
    
    # Check if the file is an image (you can add more extensions if needed)
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
        try:
            # Analyze the image for emotions
            result = DeepFace.analyze(img_path=file_path, actions=['emotion'], enforce_detection=False)
            
            # Get the dominant emotion
            dominant_emotion = result[0]['dominant_emotion']
            
            # Update the emotion count
            emotion_count[dominant_emotion] += 1
            
            # If the dominant emotion is "sad", copy the image to the sad folder
            if dominant_emotion == "sad":
                output_path = os.path.join(output_folder_sad, filename)
                shutil.copy(file_path, output_path)
                print(f"File: {filename} | Dominant Emotion: {dominant_emotion} | Copied to {output_folder_sad}")
            
            # If the dominant emotion is not "sad", move the image to the EXTRAS folder
            else:
                output_path = os.path.join(output_folder_extras, filename)
                shutil.copy(file_path, output_path)
                print(f"File: {filename} | Dominant Emotion: {dominant_emotion} | Copied to {output_folder_extras}")
        except Exception as e:
            print(f"Error processing {filename}: {e}")

# Print the final emotion counts
print("\nEmotion counts in the folder:")
for emotion, count in emotion_count.items():
    print(f"{emotion}: {count}")

In [None]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from torchvision.utils import save_image
from PIL import Image
import os
import wandb
import cv2
import dlib
import numpy as np
import time
from torch.cuda.amp import GradScaler, autocast
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
 
 
# Define paths
shape_predictor_path = r""
happy_folder = r''
sad_folder = r''
aligned_happy = r''
aligned_sad = r''
 
# Create preprocessed directories if they don't exist
os.makedirs(aligned_happy, exist_ok=True)
os.makedirs(aligned_sad, exist_ok=True)
 
# Initialize dlib's face detector and landmark predictor
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(shape_predictor_path)
 
# Block 2: Face Alignment Functions
 
def align_and_save_faces(source_folder, target_folder):
    """Preprocess and save aligned faces to target folder"""
    if len(os.listdir(target_folder)) > 0:
        return  # Skip if already processed
 
    for img_name in os.listdir(source_folder):
        img_path = os.path.join(source_folder, img_name)
        image = Image.open(img_path).convert('RGB')
        aligned = align_face(image)
        if aligned:
            aligned.save(os.path.join(target_folder, img_name))
 
def align_face(image):
    """Align face using facial landmarks"""
    try:
        image_np = np.array(image)
        gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
        faces = detector(gray, 1)
        if not faces:
            return None
        landmarks = predictor(gray, faces[0])
        landmarks = np.array([[p.x, p.y] for p in landmarks.parts()])
        desired_left_eye = (0.35, 0.35)
        desired_face_width = 256
        desired_face_height = 256
        left_eye_center = landmarks[36:42].mean(axis=0)
        right_eye_center = landmarks[42:48].mean(axis=0)
        dY = right_eye_center[1] - left_eye_center[1]
        dX = right_eye_center[0] - left_eye_center[0]
        angle = np.degrees(np.arctan2(dY, dX))
        dist = np.sqrt((dX ** 2) + (dY ** 2))
        desired_dist = (1.0 - 2 * desired_left_eye[0]) * desired_face_width
        scale = desired_dist / dist
        eyes_center = ((left_eye_center[0] + right_eye_center[0]) // 2,
                       (left_eye_center[1] + right_eye_center[1]) // 2)
        M = cv2.getRotationMatrix2D(eyes_center, angle, scale)
        tX = desired_face_width * 0.5
        tY = desired_face_height * desired_left_eye[1]
        M[0, 2] += (tX - eyes_center[0])
        M[1, 2] += (tY - eyes_center[1])
        aligned_face = cv2.warpAffine(image_np, M, (desired_face_width, desired_face_height), flags=cv2.INTER_CUBIC)
        return Image.fromarray(aligned_face)
    except Exception as e:
        print(f"Error processing image: {e}")
        return None
 
# Preprocess datasets once before training
print("Preprocessing datasets...")
#align_and_save_faces(happy_folder, aligned_happy)
align_and_save_faces(sad_folder, aligned_sad)
 


In [None]:
import random
import shutil
import os

# Paths
happy_checking_folder = r''
sad_checking_folder = r''

# Count the number of images in each folder
happy_checking_images = [f for f in os.listdir(happy_checking_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
sad_checking_images = [f for f in os.listdir(sad_checking_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]



In [2]:
len(happy_checking_images)

156014

In [3]:
len(sad_checking_images)

41032

In [None]:

import os
import shutil
import random
from tqdm import tqdm

source_happy = r''
source_sad = r''
verified_happy = r''
verified_sad = r''
extras_folder = r''
target_count = 10000  # New target count for both folders


def copy_all_images(source_folder, target_folder):
    """Copy all images from source to target folder"""
    os.makedirs(target_folder, exist_ok=True)
    images = [f for f in os.listdir(source_folder) 
             if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    
    for img in tqdm(images, desc=f"Copying {os.path.basename(source_folder)}"):
        src = os.path.join(source_folder, img)
        dst = os.path.join(target_folder, img)
        shutil.copy2(src, dst)

def adjust_folder(folder, target_count, extras_folder):
    """Adjust folder to have exactly target_count images"""
    images = os.listdir(folder)
    current_count = len(images)
    
    if current_count > target_count:
        # Downsample - move extras
        keep_images = random.sample(images, target_count)
        for img in images:
            if img not in keep_images:
                shutil.move(os.path.join(folder, img), 
                           os.path.join(extras_folder, img))
    elif current_count < target_count:
        # Upsample - create duplicates
        needed = target_count - current_count
        for i in tqdm(range(needed), desc=f"Upsampling {os.path.basename(folder)}"):
            img = random.choice(images)
            base, ext = os.path.splitext(img)
            new_name = f"{base}_dup{i}{ext}"
            shutil.copy(os.path.join(folder, img),
                       os.path.join(folder, new_name))

# ===== MAIN PROCESS =====
try:
    print("=== COPYING IMAGES ===")
    copy_all_images(source_happy, verified_happy)
    copy_all_images(source_sad, verified_sad)
    
    print("\n=== ADJUSTING FOLDERS TO 10,000 IMAGES ===")
    os.makedirs(extras_folder, exist_ok=True)
    
    # Adjust both folders to have exactly target_count images
    adjust_folder(verified_happy, target_count, extras_folder)
    adjust_folder(verified_sad, target_count, extras_folder)
    
    # Final counts
    happy_count = len(os.listdir(verified_happy))
    sad_count = len(os.listdir(verified_sad))
    
    print(f"\nFinal counts:")
    print(f"Happy: {happy_count} images")
    print(f"Sad: {sad_count} images")
    print(f"Extras moved to: {extras_folder}")

except Exception as e:
    print(f"\n!!! ERROR: {str(e)}")







=== COPYING IMAGES ===


Copying happyinitial: 100%|██████████| 155599/155599 [02:45<00:00, 942.23it/s] 
Copying sadinitial: 100%|██████████| 41032/41032 [00:35<00:00, 1152.68it/s]



=== ADJUSTING FOLDERS TO 10,000 IMAGES ===

Final counts:
Happy: 10000 images
Sad: 10000 images
Extras moved to: C:\Users\Asus\Desktop\ATML\Project\LetsTry\preprocessed\extras


In [None]:
import wandb

import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from torchvision.utils import save_image
from PIL import Image
import os
import numpy as np
import time
from torch.cuda.amp import GradScaler, autocast
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm

# Define sweep configuration
sweep_config = {
    'method': 'bayes',  # Bayesian optimization
    'metric': {
        'name': 'G_loss',
        'goal': 'minimize'   
    },
    'parameters': {
        'batch_size': {
            'values': [16, 32, 64]
        },
        'lr_g': {
            'min': 1e-5,
            'max': 2e-4,
            'distribution': 'uniform'
        },
        'lr_d': {
            'min': 1e-5,
            'max': 2e-4,
            'distribution': 'uniform'
        },
        'lambda_cycle': {
            'min': 5.0,
            'max': 15.0,
            'distribution': 'uniform'
        },
        'lambda_identity': {
            'min': 0.5,
            'max': 5.0,
            'distribution': 'uniform'
        },
        'num_residual_blocks': {
            'values': [6, 9, 12]
        }
    }
}

# Initialize sweep
sweep_id = wandb.sweep(sweep_config, project="sad-happy-translation")

Create sweep with ID: c9ox8yvd
Sweep URL: https://wandb.ai/utkarshrawat04-nmims/sad-happy-translation/sweeps/c9ox8yvd


In [None]:
# ... (keep all your original imports here)

# Define the sweep configuration
sweep_config = {
    'method': 'random',
    'metric': {
        'name': 'G_loss',
        'goal': 'minimize'
    },
    'parameters': {
        'lr_G': {
            'min': 1e-6,
            'max': 2e-4
        },
        'lr_D': {
            'min': 1e-6,
            'max': 2e-4
        },
        'beta1': {
            'min': 0.4,
            'max': 0.9
        },
        'beta2': {
            'min': 0.95,
            'max': 0.9999
        },
        'lambda_cycle': {
            'min': 3.0,
            'max': 10.0
        },
        'lambda_identity': {
            'min': 0.5,
            'max': 5.0
        },
        'lambda_perceptual': {
            'min': 0.1,
            'max': 2.0
        }
    }
}

# Initialize sweep
sweep_id = wandb.sweep(sweep_config, project="face-expression-translation")

# Keep your dataset and dataloader setup unchanged
verified_happy = r''
verified_sad = r''


def train():
    # Initialize wandb run
    wandb.init()
    
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Initialize models
    G_happyTosad = Generator().to(device)
    G_sadTohappy = Generator().to(device)
    D_happy = Discriminator().to(device)
    D_sad = Discriminator().to(device)

    # Get hyperparameters from wandb.config
    config = wandb.config
    lambda_cycle = config.lambda_cycle
    lambda_identity = config.lambda_identity
    lambda_perceptual = config.lambda_perceptual

    # Initialize optimizers with sweep parameters
    G_optimizer = optim.Adam(
        list(G_happyTosad.parameters()) + list(G_sadTohappy.parameters()),
        lr=config.lr_G,
        betas=(config.beta1, config.beta2)
    )
    D_optimizer = optim.Adam(
        list(D_happy.parameters()) + list(D_sad.parameters()),
        lr=config.lr_D,
        betas=(config.beta1, config.beta2)
    )

    # Keep loss functions and VGG setup unchanged
    criterion_gan = nn.MSELoss()
    criterion_cycle = nn.L1Loss()
    criterion_identity = nn.L1Loss()
    vgg = models.vgg19(pretrained=True).features.to(device).eval()
    scaler = GradScaler()
    
    # Training loop with hyperparameters
    num_epochs = 100
    for epoch in range(num_epochs):
        # ... (keep your existing training loop code here)
        
        # Modify the loss calculation to use sweep parameters
        with autocast():
            # ... (existing code)
            
            # Total generator loss
            G_loss = (
                loss_gan_happyTosad + loss_gan_sadTohappy +
                lambda_cycle * (loss_cycle_happy + loss_cycle_sad) +
                lambda_identity * (loss_identity_sad + loss_identity_happy) +
                lambda_perceptual * (loss_perceptual_sad + loss_perceptual_happy)
            )

        # Log additional hyperparameters
        wandb.log({
            **{k: v for k, v in config.items()},
            "G_loss": G_loss.item(),
            "D_loss": D_loss.item(),
            "epoch": epoch
        })


    torch.save(G_happyTosad.state_dict(), f'G_happyTosad_{wandb.run.id}.pth')
    torch.save(G_sadTohappy.state_dict(), f'G_sadTohappy_{wandb.run.id}.pth')

# Run the sweep
wandb.agent(sweep_id, function=train, count=50)  # Adjust count as needed



In [None]:
# Initialize wandb
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from torchvision.utils import save_image
from PIL import Image
import os
import wandb
import cv2
import dlib
import numpy as np
import time
from torch.cuda.amp import GradScaler, autocast
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm

wandb.init(project="face-expression-translation")

verified_happy = r''
verified_sad = r''

# Block 3: Dataset and DataLoader
 
class PreprocessedDataset(Dataset):
    """Dataset that loads preprocessed images"""
    def __init__(self, folder, transform=None):
        self.image_paths = [os.path.join(folder, f) for f in os.listdir(folder)]
        self.transform = transform
 
    def __len__(self):
        return len(self.image_paths)
 
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        if self.transform:
            image = np.array(image)  # Convert PIL image to numpy array
            image = self.transform(image=image)['image']
        return image

# Define transformations with Albumentations
transform = A.Compose([
    A.Resize(256, 256),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Rotate(limit=15, p=0.3),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2(),
])
 
# Create datasets and dataloaders
print("Creating dataloaders...")
happy_dataset = PreprocessedDataset(verified_happy, transform)
sad_dataset = PreprocessedDataset(verified_sad, transform)
 
happy_dataloader = DataLoader(
    happy_dataset,
    batch_size=16,
    shuffle=True,
    pin_memory=True
)
 
sad_dataloader = DataLoader(
    sad_dataset,
    batch_size=16,
    shuffle=True,
    pin_memory=True
)
 
# Block 4: Generator and Discriminator Models (Modified for PatchGAN)

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.down1 = self.conv_block(3, 64, normalize=False)
        self.down2 = self.conv_block(64, 128)
        self.down3 = self.conv_block(128, 256)
        self.down4 = self.conv_block(256, 512)
        self.down5 = self.conv_block(512, 512)
        self.up1 = self.deconv_block(512, 512)
        self.up2 = self.deconv_block(1024, 256)
        self.up3 = self.deconv_block(512, 128)
        self.up4 = self.deconv_block(256, 64)
        self.up5 = self.deconv_block(128, 3, normalize=False, activation=nn.Tanh())
        self.apply(self._init_weights)
 
    def conv_block(self, in_channels, out_channels, normalize=True, activation=nn.LeakyReLU(0.2)):
        layers = [nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(activation)
        return nn.Sequential(*layers)
 
    def deconv_block(self, in_channels, out_channels, normalize=True, activation=nn.ReLU()):
        layers = [nn.ConvTranspose2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(activation)
        return nn.Sequential(*layers)
 
    def _init_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
            init.normal_(m.weight, 0.0, 0.02)
            if m.bias is not None:
                init.constant_(m.bias, 0.0)
 
    def forward(self, x):
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        u1 = self.up1(d5)
        u2 = self.up2(torch.cat([u1, d4], dim=1))
        u3 = self.up3(torch.cat([u2, d3], dim=1))
        u4 = self.up4(torch.cat([u3, d2], dim=1))
        u5 = self.up5(torch.cat([u4, d1], dim=1))
        return u5

class Discriminator(nn.Module):
    """PatchGAN discriminator"""
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, True),
            
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(128),
            nn.LeakyReLU(0.2, True),
            
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(256),
            nn.LeakyReLU(0.2, True),
            
            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.InstanceNorm2d(512),
            nn.LeakyReLU(0.2, True),
            
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)  # Outputs 30x30 patch predictions
        )
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Conv2d):
            init.normal_(m.weight, 0.0, 0.02)
            if m.bias is not None:
                init.constant_(m.bias, 0.0)

    def forward(self, x):
        return self.model(x)  # Output shape: (batch_size, 1, 30, 30)

# Block 5: Training Setup (No changes needed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
G_happyTosad = Generator().to(device)
G_sadTohappy = Generator().to(device)
D_happy = Discriminator().to(device)
D_sad = Discriminator().to(device)
 
G_optimizer = optim.Adam(list(G_happyTosad.parameters()) + list(G_sadTohappy.parameters()), lr=0.00005, betas=(0.5, 0.999))
D_optimizer = optim.Adam(list(D_happy.parameters()) + list(D_sad.parameters()), lr=0.00005, betas=(0.5, 0.999))
 
criterion_gan = nn.MSELoss()
criterion_cycle = nn.L1Loss()
criterion_identity = nn.L1Loss()
 
vgg = models.vgg19(pretrained=True).features.to(device).eval()
for param in vgg.parameters():
    param.requires_grad = False
 
def perceptual_loss(generated, real, vgg_model):
    def extract_features(x, vgg_model):
        features = []
        for layer_num, layer in enumerate(vgg_model):
            x = layer(x)
            if layer_num in [2, 7, 12, 21, 30]:
                features.append(x)
        return features
    generated_features = extract_features(generated, vgg_model)
    real_features = extract_features(real, vgg_model)
    loss = 0
    for gen_feature, real_feature in zip(generated_features, real_features):
        loss += F.l1_loss(gen_feature, real_feature)
    return loss
 
scaler = GradScaler()
num_epochs = 100
lambda_cycle = 5.0
lambda_identity = 2.0
lambda_perceptual = 0.5

# Block 6: Training Loop (No changes needed)
for epoch in range(num_epochs):
    epoch_start = time.time()
    print(f"\nStarting Epoch {epoch+1}/{num_epochs}")
 
    with tqdm(total=len(happy_dataloader), desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch") as pbar:
        for i, (happy_images, sad_images) in enumerate(zip(happy_dataloader, sad_dataloader)):
            iter_start = time.time()
            happy_images = happy_images.to(device, non_blocking=True)
            sad_images = sad_images.to(device, non_blocking=True)
 
            # Generator training
            G_happyTosad.zero_grad()
            G_sadTohappy.zero_grad()
    
            with autocast():
                # Identity losses
                identity_sad = G_happyTosad(sad_images)
                loss_identity_sad = criterion_identity(identity_sad, sad_images)
                identity_happy = G_sadTohappy(happy_images)
                loss_identity_happy = criterion_identity(identity_happy, happy_images)
    
                # GAN losses
                fake_sad = G_happyTosad(happy_images)
                fake_sad_pred = D_sad(fake_sad)
                loss_gan_happyTosad = criterion_gan(fake_sad_pred, torch.ones_like(fake_sad_pred))
    
                fake_happy = G_sadTohappy(sad_images)
                fake_happy_pred = D_happy(fake_happy)
                loss_gan_sadTohappy = criterion_gan(fake_happy_pred, torch.ones_like(fake_happy_pred))
    
                # Cycle losses
                reconstructed_happy = G_sadTohappy(fake_sad)
                loss_cycle_happy = criterion_cycle(reconstructed_happy, happy_images)
                reconstructed_sad = G_happyTosad(fake_happy)
                loss_cycle_sad = criterion_cycle(reconstructed_sad, sad_images)
    
                # Perceptual losses
                loss_perceptual_sad = perceptual_loss(fake_sad, sad_images, vgg)
                loss_perceptual_happy = perceptual_loss(fake_happy, happy_images, vgg)
    
                # Total generator loss
                G_loss = (
                    loss_gan_happyTosad + loss_gan_sadTohappy +
                    lambda_cycle * (loss_cycle_happy + loss_cycle_sad) +
                    lambda_identity * (loss_identity_sad + loss_identity_happy) +
                    lambda_perceptual * (loss_perceptual_sad + loss_perceptual_happy)
                )
    
            scaler.scale(G_loss).backward()
            scaler.step(G_optimizer)
            scaler.update()
    
            # Discriminator training
            D_happy.zero_grad()
            D_sad.zero_grad()
    
            with autocast():
                # Real images
                real_happy_pred = D_happy(happy_images)
                loss_real_happy = criterion_gan(real_happy_pred, torch.ones_like(real_happy_pred))
                real_sad_pred = D_sad(sad_images)
                loss_real_sad = criterion_gan(real_sad_pred, torch.ones_like(real_sad_pred))
    
                # Fake images
                fake_happy_pred = D_happy(fake_happy.detach())
                loss_fake_happy = criterion_gan(fake_happy_pred, torch.zeros_like(fake_happy_pred))
                fake_sad_pred = D_sad(fake_sad.detach())
                loss_fake_sad = criterion_gan(fake_sad_pred, torch.zeros_like(fake_sad_pred))
    
                D_loss = (loss_real_happy + loss_fake_happy + loss_real_sad + loss_fake_sad) / 2
    
            scaler.scale(D_loss).backward()
            scaler.step(D_optimizer)
            scaler.update()
    
            # Logging
            if i % 20 == 0:
                iter_time = time.time() - iter_start
                pbar.set_postfix({
                    "Time": f"{iter_time:.2f}s",
                    "G_loss": f"{G_loss.item():.4f}",
                    "D_loss": f"{D_loss.item():.4f}"
                })
            pbar.update(1)
 
        # Epoch logging
        epoch_time = time.time() - epoch_start
        print(f"Epoch {epoch+1} completed in {epoch_time//60:.0f}m {epoch_time%60:.0f}s")
        wandb.log({
            "G_loss": G_loss.item(),
            "D_loss": D_loss.item(),
            "epoch": epoch,
            "step": i
        })
        
        # Save sample images
        with torch.no_grad():
            fake_sad = G_happyTosad(happy_images)
            fake_happy = G_sadTohappy(sad_images)
            save_image(fake_sad, f'fake_sad_epoch_{epoch+1}.png', normalize=True)
            save_image(fake_happy, f'fake_happy_epoch_{epoch+1}.png', normalize=True)
            wandb.log({
                "Fake sad": [wandb.Image(fake_sad[0], caption=f"Epoch {epoch+1}")],
                "Fake happy": [wandb.Image(fake_happy[0], caption=f"Epoch {epoch+1}")]
            })
 
# Block 7: Model Saving and Wandb Finish
torch.save(G_happyTosad.state_dict(), 'G_happyTosad.pth')
torch.save(G_sadTohappy.state_dict(), 'G_sadTohappy.pth')
torch.save(D_happy.state_dict(), 'D_happy.pth')
torch.save(D_sad.state_dict(), 'D_sad.pth')
 
wandb.finish()






In [None]:
import torch
from torchvision.utils import save_image
from PIL import Image
import os
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torch.nn as nn
import torch.nn.init as init
import dlib
import cv2
import matplotlib.pyplot as plt

# Define the Generator model (matches PatchGAN-trained architecture)
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.down1 = self.conv_block(3, 64, normalize=False)
        self.down2 = self.conv_block(64, 128)
        self.down3 = self.conv_block(128, 256)
        self.down4 = self.conv_block(256, 512)
        self.down5 = self.conv_block(512, 512)
        self.up1 = self.deconv_block(512, 512)
        self.up2 = self.deconv_block(1024, 256)
        self.up3 = self.deconv_block(512, 128)
        self.up4 = self.deconv_block(256, 64)
        self.up5 = self.deconv_block(128, 3, normalize=False, activation=nn.Tanh())
        self.apply(self._init_weights)

    def conv_block(self, in_channels, out_channels, normalize=True, activation=nn.LeakyReLU(0.2)):
        layers = [nn.Conv2d(in_channels, out_channels, 4, 2, 1)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(activation)
        return nn.Sequential(*layers)

    def deconv_block(self, in_channels, out_channels, normalize=True, activation=nn.ReLU()):
        layers = [nn.ConvTranspose2d(in_channels, out_channels, 4, 2, 1)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(activation)
        return nn.Sequential(*layers)

    def _init_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
            init.normal_(m.weight, 0.0, 0.02)
            if m.bias is not None:
                init.constant_(m.bias, 0.0)

    def forward(self, x):
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        u1 = self.up1(d5)
        u2 = self.up2(torch.cat([u1, d4], 1))
        u3 = self.up3(torch.cat([u2, d3], 1))
        u4 = self.up4(torch.cat([u3, d2], 1))
        u5 = self.up5(torch.cat([u4, d1], 1))
        return u5

# Load trained models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
G_happyTosad = Generator().to(device)
G_sadTohappy = Generator().to(device)
G_happyTosad.load_state_dict(torch.load('G_happyTosad.pth', map_location=device))
G_sadTohappy.load_state_dict(torch.load('G_sadTohappy.pth', map_location=device))
G_happyTosad.eval()
G_sadTohappy.eval()

# Define transformations (matches training preprocessing)
transform = A.Compose([
    A.Resize(256, 256),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2(),
])

# Initialize dlib components
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")

def align_face(image):
    """Robust face alignment implementation with error handling"""
    try:
        image_np = np.array(image)
        gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
        
        # Detect faces with upsampling for better detection
        faces = detector(gray, 1)
        if not faces:
            print("No faces detected")
            return None
            
        # Use the first detected face
        face = faces[0]
        
        # Get facial landmarks with error checking
        landmarks = predictor(gray, face)
        if not landmarks:
            print("No landmarks detected")
            return None
            
        # Convert landmarks to numpy array with integer coordinates
        landmarks = np.array([[p.x, p.y] for p in landmarks.parts()], dtype=np.int32)
        if landmarks.size == 0:
            print("Empty landmarks array")
            return None
            
        # Calculate eye positions with bounds checking
        try:
            # Left eye landmarks (points 36-41)
            left_eye_points = landmarks[36:42]
            # Right eye landmarks (points 42-47)
            right_eye_points = landmarks[42:48]
            
            if len(left_eye_points) < 6 or len(right_eye_points) < 6:
                print("Incomplete eye landmarks")
                return None
                
            left_eye_center = left_eye_points.mean(axis=0).astype(np.int32)
            right_eye_center = right_eye_points.mean(axis=0).astype(np.int32)
            
        except IndexError as e:
            print(f"Landmark indexing error: {str(e)}")
            return None
            
        # Calculate angle between eyes
        dY = right_eye_center[1] - left_eye_center[1]
        dX = right_eye_center[0] - left_eye_center[0]
        angle = np.degrees(np.arctan2(dY, dX))
        
        # Calculate desired scale
        desired_left_eye = (0.35, 0.35)
        desired_face_width = 256
        desired_face_height = 256
        
        # Determine scale of new image
        dist = np.sqrt((dX**2) + (dY**2))
        desired_dist = (1.0 - 2 * desired_left_eye[0]) * desired_face_width
        scale = desired_dist / max(dist, 1e-6)  # Prevent division by zero
        
        # Find center point between eyes
        eyes_center = (
            int((left_eye_center[0] + right_eye_center[0]) // 2),
            int((left_eye_center[1] + right_eye_center[1]) // 2)
        )
        
        # Calculate rotation matrix
        M = cv2.getRotationMatrix2D(eyes_center, angle, scale)
        
        # Update translation components
        tX = desired_face_width * 0.5
        tY = desired_face_height * desired_left_eye[1]
        M[0, 2] += (tX - eyes_center[0])
        M[1, 2] += (tY - eyes_center[1])
        
        # Apply affine transformation
        (w, h) = (desired_face_width, desired_face_height)
        aligned_face = cv2.warpAffine(
            image_np, M, (w, h),
            flags=cv2.INTER_CUBIC,
            borderMode=cv2.BORDER_REPLICATE
        )
        
        return Image.fromarray(aligned_face)
        
    except Exception as e:
        print(f"Alignment error: {str(e)}")
        return None

def preprocess_image(image_path):
    """Complete preprocessing pipeline"""
    image = Image.open(image_path).convert('RGB')
    aligned_image = align_face(image)
    if aligned_image is None:
        raise ValueError("No face detected in the image")
    
    image_np = np.array(aligned_image)
    transformed = transform(image=image_np)
    return transformed['image'].unsqueeze(0).to(device)

def generate_and_display_images(input_image_path, generator, title):
    """Complete generation and display workflow"""
    try:
        # Load and align
        input_image = Image.open(input_image_path).convert('RGB')
        aligned_image = align_face(input_image)
        if aligned_image is None:
            print("Skipping image - no face detected")
            return

        # Preprocess
        input_tensor = transform(image=np.array(aligned_image))['image']
        input_tensor = input_tensor.unsqueeze(0).to(device)

        # Generate
        with torch.no_grad():
            generated = generator(input_tensor)

        # Postprocess
        generated_np = generated.squeeze().cpu().numpy()
        generated_np = np.transpose(generated_np, (1, 2, 0))
        generated_np = (generated_np * 0.5 + 0.5) * 255
        generated_np = generated_np.clip(0, 255).astype(np.uint8)

        # Visualize
        plt.figure(figsize=(15, 7))
        plt.subplot(1, 2, 1)
        plt.imshow(aligned_image)
        plt.title("Aligned Input")
        plt.axis('off')

        plt.subplot(1, 2, 2)
        plt.imshow(generated_np)
        plt.title(title)
        plt.axis('off')

        plt.show()

    except Exception as e:
        print(f"Generation failed: {str(e)}")

# Example usage
test_cases = [
    (r"", G_happyTosad, "Generated Sad Expression")
]

for img_path, model, title in test_cases:
    generate_and_display_images(img_path, model, title)