In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.utils import save_image
import os
from PIL import Image
from tqdm.notebook import tqdm
import itertools, shutil

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"üñ•Ô∏è  {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

# ‚ö° LIGHT SETTINGS
IMG_SIZE = 128
EPOCHS = 20
BATCH_SIZE = 2

os.makedirs('outputs/samples', exist_ok=True)
os.makedirs('checkpoints', exist_ok=True)
os.makedirs('results', exist_ok=True)

print(f"\n‚ö° LIGHT: {IMG_SIZE}x{IMG_SIZE}, {EPOCHS} epochs, batch {BATCH_SIZE}")
print("‚úÖ Ready!")

üñ•Ô∏è  cuda
GPU: NVIDIA GeForce GTX 1050

‚ö° LIGHT: 128x128, 20 epochs, batch 2
‚úÖ Ready!


In [2]:
class SimpleDataset(Dataset):
    def __init__(self, real_dir, anime_dir, transform):
        # Only use 5000 images for speed!
        real_files = [f for f in os.listdir(real_dir) if f.endswith(('.jpg','.png'))][:5000]
        anime_files = [f for f in os.listdir(anime_dir) if f.endswith(('.jpg','.png'))][:5000]
        
        self.real = [os.path.join(real_dir, f) for f in real_files]
        self.anime = [os.path.join(anime_dir, f) for f in anime_files]
        self.transform = transform
        print(f"Using {len(self.real)} real, {len(self.anime)} anime")
    
    def __len__(self):
        return max(len(self.real), len(self.anime))
    
    def __getitem__(self, idx):
        r = Image.open(self.real[idx % len(self.real)]).convert('RGB')
        a = Image.open(self.anime[idx % len(self.anime)]).convert('RGB')
        return {'real': self.transform(r), 'anime': self.transform(a)}

transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

dataset = SimpleDataset('train/Real_Faces', 'train/Anime_Faces', transform)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)

print(f"‚úÖ {len(loader)} batches per epoch")

Using 5000 real, 5000 anime
‚úÖ 2500 batches per epoch


In [3]:
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        # Very simple encoder-decoder
        self.enc = nn.Sequential(
            nn.Conv2d(3, 32, 7, padding=3), nn.ReLU(),
            nn.Conv2d(32, 64, 3, 2, 1), nn.ReLU(),
            nn.Conv2d(64, 128, 3, 2, 1), nn.ReLU()
        )
        self.dec = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, 2, 1, 1), nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 3, 2, 1, 1), nn.ReLU(),
            nn.Conv2d(32, 3, 7, padding=3), nn.Tanh()
        )
    def forward(self, x):
        return self.dec(self.enc(x))

class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 32, 4, 2, 1), nn.LeakyReLU(0.2),
            nn.Conv2d(32, 64, 4, 2, 1), nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, 4, 2, 1), nn.LeakyReLU(0.2),
            nn.Conv2d(128, 1, 4, 1, 1)
        )
    def forward(self, x):
        return self.model(x)

G_AB = Generator().to(device)
G_BA = Generator().to(device)
D_A = Discriminator().to(device)
D_B = Discriminator().to(device)

opt_G = optim.Adam(itertools.chain(G_AB.parameters(), G_BA.parameters()), lr=0.0002, betas=(0.5, 0.999))
opt_D_A = optim.Adam(D_A.parameters(), lr=0.0002, betas=(0.5, 0.999))
opt_D_B = optim.Adam(D_B.parameters(), lr=0.0002, betas=(0.5, 0.999))

L_GAN = nn.MSELoss()
L_cyc = nn.L1Loss()
L_id = nn.L1Loss()

print(f"‚úÖ Models: {sum(p.numel() for p in G_AB.parameters()):,} params (LIGHT!)")

‚úÖ Models: 194,051 params (LIGHT!)


In [None]:
print(f"\nüöÄ Training {EPOCHS} epochs...\n")

for ep in range(EPOCHS):
    pbar = tqdm(loader, desc=f"Epoch {ep+1}/{EPOCHS}")
    
    for batch in pbar:
        rA = batch['anime'].to(device)
        rB = batch['real'].to(device)
        
        valid = torch.ones((rA.size(0), 1, 15, 15), device=device)
        fake = torch.zeros((rA.size(0), 1, 15, 15), device=device)
        
        # Gen
        opt_G.zero_grad()
        fB = G_AB(rB)
        fA = G_BA(rA)
        
        l_id = (L_id(G_BA(rA), rA) + L_id(G_AB(rB), rB)) / 2
        l_gan = (L_GAN(D_A(fB), valid) + L_GAN(D_B(fA), valid)) / 2
        l_cyc = (L_cyc(G_BA(fB), rB) + L_cyc(G_AB(fA), rA)) / 2
        
        l_G = l_gan + 10*l_cyc + 5*l_id
        l_G.backward()
        opt_G.step()
        
        # Disc
        opt_D_A.zero_grad()
        l_DA = (L_GAN(D_A(rA), valid) + L_GAN(D_A(fB.detach()), fake)) / 2
        l_DA.backward()
        opt_D_A.step()
        
        opt_D_B.zero_grad()
        l_DB = (L_GAN(D_B(rB), valid) + L_GAN(D_B(fA.detach()), fake)) / 2
        l_DB.backward()
        opt_D_B.step()
        
        pbar.set_postfix({'G': f'{l_G.item():.2f}', 'D': f'{(l_DA+l_DB).item()/2:.2f}'})
    
    if (ep+1) % 5 == 0:
        with torch.no_grad():
            save_image((torch.cat([rB[:2], fB[:2]])+1)/2, f'outputs/samples/ep{ep+1}.png', nrow=2)
        print(f"  ‚úì Sample saved")

torch.save(G_AB.state_dict(), 'checkpoints/G_AB_final.pt')
print("\nüéâ Done! Saved to checkpoints/G_AB_final.pt")


üöÄ Training 20 epochs...



Epoch 1/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 2/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 3/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 4/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 5/20:   0%|          | 0/2500 [00:00<?, ?it/s]

  ‚úì Sample saved


Epoch 6/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 7/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 8/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 9/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 10/20:   0%|          | 0/2500 [00:00<?, ?it/s]

  ‚úì Sample saved


Epoch 11/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 12/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 13/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 14/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 15/20:   0%|          | 0/2500 [00:00<?, ?it/s]

  ‚úì Sample saved


Epoch 16/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 17/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 18/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 19/20:   0%|          | 0/2500 [00:00<?, ?it/s]

Epoch 20/20:   0%|          | 0/2500 [00:00<?, ?it/s]

In [None]:
# ========================================
# INFERENCE: Generate JoJo-style Images
# ========================================

# Configuration - Update TEST_DIR when test set is released
TEST_DIR = 'test'  # <-- UPDATE THIS PATH when you get the test set
OUTPUT_DIR = 'results'

# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Set model to evaluation mode
G_AB.eval()

# Transform for test images
test_trans = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Get test images
if not os.path.exists(TEST_DIR):
    print(f"‚ö†Ô∏è  WARNING: {TEST_DIR} folder not found!")
    print(f"Please create the folder and add test images when they are released.\n")
    tests = []
else:
    tests = [f for f in os.listdir(TEST_DIR) if f.endswith(('.jpg','.png'))]
    print(f"Found {len(tests)} test images in {TEST_DIR}/")

if len(tests) > 0:
    print(f"Generating {len(tests)} JoJo-style images...\n")
    
    with torch.no_grad():
        for name in tqdm(tests):
            # Load and transform image
            img = test_trans(Image.open(f'{TEST_DIR}/{name}').convert('RGB')).unsqueeze(0).to(device)
            
            # Generate JoJo-style image
            out = G_AB(img)
            
            # Save (denormalize from [-1,1] to [0,1])
            save_image((out+1)/2, f'{OUTPUT_DIR}/{name}')
    
    print(f"\n‚úÖ Generated {len(tests)} JoJo-style images in {OUTPUT_DIR}/")
else:
    print("No test images found. Waiting for test set release...")


In [None]:
# ========================================
# CREATE SUBMISSION PACKAGE
# ========================================
import matplotlib.pyplot as plt
import zipfile

# Check if we have results
if not os.path.exists(OUTPUT_DIR) or len(os.listdir(OUTPUT_DIR)) == 0:
    print("‚ö†Ô∏è  No results found! Please run inference first.")
else:
    # Show preview of first 4 results
    result_files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(('.jpg','.png'))][:4]
    
    if len(result_files) >= 4:
        fig, ax = plt.subplots(2, 4, figsize=(16, 8))
        for i in range(4):
            ax[0,i].imshow(Image.open(f'{TEST_DIR}/{result_files[i]}'))
            ax[0,i].set_title('Original')
            ax[0,i].axis('off')
            ax[1,i].imshow(Image.open(f'{OUTPUT_DIR}/{result_files[i]}'))
            ax[1,i].set_title('JoJo Style')
            ax[1,i].axis('off')
        plt.tight_layout()
        plt.savefig('outputs/preview.png')
        plt.show()
    
    # Create submission folder structure
    SUBMISSION_DIR = 'submission'
    os.makedirs(SUBMISSION_DIR, exist_ok=True)
    
    # Copy model file
    if os.path.exists('checkpoints/G_AB_final.pt'):
        shutil.copy('checkpoints/G_AB_final.pt', f'{SUBMISSION_DIR}/best_generator.pt')
        print("‚úÖ Copied model: best_generator.pt")
    else:
        print("‚ö†Ô∏è  Warning: Model checkpoint not found!")
    
    # Copy test results
    if os.path.exists(f'{SUBMISSION_DIR}/test_results'):
        shutil.rmtree(f'{SUBMISSION_DIR}/test_results')
    shutil.copytree(OUTPUT_DIR, f'{SUBMISSION_DIR}/test_results')
    print(f"‚úÖ Copied {len(tests)} images to test_results/")
    
    # Create README
    with open(f'{SUBMISSION_DIR}/README.md', 'w') as f:
        f.write(f"""# JoJo Style Transfer - Homework 2
""")
    
    # Create ZIP file for submission
    zip_filename = 'submission.zip'
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Add all files in submission folder
        for root, dirs, files in os.walk(SUBMISSION_DIR):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, SUBMISSION_DIR)
                zipf.write(file_path, arcname)
    
    print(f"\nüì¶ SUBMISSION PACKAGE READY!")
    print(f"   ‚úÖ {SUBMISSION_DIR}/best_generator.pt")
    print(f"   ‚úÖ {SUBMISSION_DIR}/test_results/ ({len(tests)} images)")
    print(f"   ‚úÖ {SUBMISSION_DIR}/README.md")
    print(f"   ‚úÖ {zip_filename} (ready to upload!)")
    print(f"\nüí° Upload '{zip_filename}' to Moodle")
