<a href="https://colab.research.google.com/github/tayyabrehman96/Cascading-Multi-Agent-Anomaly-Detection-/blob/main/AAMS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import the necessary library
from google.colab import drive

# Mount your Google Drive
# This will prompt you for authorization. Follow the link, get the code,
# and paste it back in the input box.
drive.mount('/content/drive')

print("\n✅ Google Drive has been successfully mounted!")
print("You can now browse your files in the folder icon on the left.")

In [None]:
import os, zipfile, glob

# --- Path to your ZIP file in Google Drive ---
zip_path = "/content/drive/MyDrive/Dataset Conf/archive.zip"
extract_path = "/content/local_dataset"

# --- Step 1: Check if ZIP file exists ---
if not os.path.isfile(zip_path):
    print(f"\n❌ ERROR: ZIP file not found at: {zip_path}")
    print("\nTip: Check spelling, spacing (e.g., 'Dataset Conf'), and right-click → Copy path in Colab.")
else:
    print(f"\n✅ Found ZIP file: {zip_path}")

    # --- Step 2: Extract if not already extracted ---
    if not os.path.exists(extract_path):
        print("⏳ Extracting ZIP... this may take a while.")
        os.makedirs(extract_path, exist_ok=True)
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        print("✅ Extraction complete!")
    else:
        print("⚡ Already extracted, skipping extraction.")

    # --- Step 3: Collect ONLY image files ---
    image_extensions = ["*.jpg", "*.jpeg", "*.png"]
    all_images = []
    for ext in image_extensions:
        all_images.extend(glob.glob(os.path.join(extract_path, "**", ext), recursive=True))

    print("-" * 40)
    print(f"📂 Extraction folder: {extract_path}")
    print(f"📸 Total images found: {len(all_images):,}")
    if all_images:
        print(f"👉 Example image: {all_images[0]}")
    print("-" * 40)


In [None]:
# --- 1. Imports ---
import os, glob, shutil
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm

# --- 2. Force GPU (Tesla T4) ---
assert torch.cuda.is_available(), "❌ No GPU found! Please enable GPU runtime in Colab."
device = torch.device("cuda")
print(f"🔥 Using device: {device}")
print("GPU Name:", torch.cuda.get_device_name(0))

# --- 3. Path after extraction ---
DATASET_PATH = "/content/local_dataset"
PROCESSED_PATH = "/content/processed_dataset"

# --- 4. Collect all images (.jpg, .jpeg, .png) ---
image_extensions = ["*.jpg", "*.jpeg", "*.png"]
all_images = []
for ext in image_extensions:
    all_images.extend(glob.glob(os.path.join(DATASET_PATH, "**", ext), recursive=True))

print(f"📸 Total images collected: {len(all_images):,}")
if not all_images:
    raise RuntimeError("❌ No images found! Check dataset extraction.")

# --- 5. Train/Val/Test split ---
train_imgs, temp_imgs = train_test_split(all_images, test_size=0.2, random_state=42)
val_imgs, test_imgs = train_test_split(temp_imgs, test_size=0.5, random_state=42)

print(f"📂 Train: {len(train_imgs):,}, Val: {len(val_imgs):,}, Test: {len(test_imgs):,}")

# --- 6. Organize into folders ---
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(PROCESSED_PATH, split), exist_ok=True)

def copy_images(file_list, destination):
    for f in file_list:
        shutil.copy(f, destination)

copy_images(train_imgs, os.path.join(PROCESSED_PATH, "train"))
copy_images(val_imgs, os.path.join(PROCESSED_PATH, "val"))
copy_images(test_imgs, os.path.join(PROCESSED_PATH, "test"))
print("✅ Dataset organized into train/val/test folders.")

# --- 7. Custom Dataset ---
class ImageDataset(Dataset):
    def __init__(self, folder_path, transform=None):
        self.image_paths = sorted(glob.glob(os.path.join(folder_path, "*")))
        self.transform = transform
    def __len__(self):
        return len(self.image_paths)
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

# --- 8. Data transforms & loaders ---
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 64

transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
])

train_dataset = ImageDataset(os.path.join(PROCESSED_PATH, "train"), transform=transform)
val_dataset = ImageDataset(os.path.join(PROCESSED_PATH, "val"), transform=transform)
test_dataset = ImageDataset(os.path.join(PROCESSED_PATH, "test"), transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                        num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                         num_workers=4, pin_memory=True)

print(f"✅ Dataset ready: {len(train_dataset)} train, {len(val_dataset)} val, {len(test_dataset)} test")

# --- 9. Autoencoder model (Contraction + Reconstruction) ---
class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1), nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1), nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1), nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.decoder(self.encoder(x))

# --- 10. Init model, loss, optimizer ---
model = ConvAutoencoder().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
print(model)

# --- 11. Training loop (20 epochs) ---
NUM_EPOCHS = 20
train_losses, val_losses = [], []

print("\n🚀 Training started...")
for epoch in range(NUM_EPOCHS):
    # --- Training ---
    model.train()
    running_loss = 0.0
    for images in tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Train]"):
        images = images.to(device, non_blocking=True)
        outputs = model(images)
        loss = criterion(outputs, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    epoch_train_loss = running_loss / len(train_loader)
    train_losses.append(epoch_train_loss)

    # --- Validation ---
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images in val_loader:
            images = images.to(device, non_blocking=True)
            outputs = model(images)
            loss = criterion(outputs, images)
            val_loss += loss.item()
    epoch_val_loss = val_loss / len(val_loader)
    val_losses.append(epoch_val_loss)

    print(f"📊 Epoch [{epoch+1}/{NUM_EPOCHS}] | Train Loss: {epoch_train_loss:.6f} | Val Loss: {epoch_val_loss:.6f}")

print("✅ Training finished!")

# --- 12. Plot loss curves ---
plt.figure(figsize=(10,5))
plt.plot(range(1, NUM_EPOCHS+1), train_losses, label="Train Loss", marker='o')
plt.plot(range(1, NUM_EPOCHS+1), val_losses, label="Val Loss", marker='o')
plt.xlabel("Epochs"); plt.ylabel("MSE Loss")
plt.title("Training vs Validation Loss (Tesla T4 GPU)")
plt.legend(); plt.grid(True)
plt.show()

# --- 13. Reconstruction visualization ---
model.eval()
test_images = next(iter(test_loader)).to(device)
with torch.no_grad():
    reconstructed = model(test_images)

plt.figure(figsize=(20,6))
for i in range(5):
    ax = plt.subplot(2, 5, i+1)
    plt.imshow(test_images[i].cpu().permute(1,2,0))
    plt.title("Original"); plt.axis("off")

    ax = plt.subplot(2, 5, i+6)
    plt.imshow(reconstructed[i].cpu().permute(1,2,0))
    plt.title("Reconstructed"); plt.axis("off")
plt.suptitle("Autoencoder Reconstructions (Tesla T4 GPU)", fontsize=16)
plt.show()


In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# --- 0. Fix: Mount Drive and Extract Dataset ZIP ---
import os, zipfile, glob
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Path to ZIP file (with space handled correctly)
zip_path = "/content/drive/MyDrive/Dataset Conf/archive.zip"
extract_path = "/content/local_dataset"

# Check if ZIP exists
if not os.path.isfile(zip_path):
    raise FileNotFoundError(f"❌ ZIP not found at: {zip_path}\nTip: Check spelling and spaces!")

print(f"✅ Found ZIP: {zip_path}")

# Extract only once
if not os.path.exists(extract_path) or not os.listdir(extract_path):
    print("⏳ Extracting dataset...")
    os.makedirs(extract_path, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("✅ Extraction done")
else:
    print("⚡ Already extracted, skipping extraction.")

# Collect only images (.jpg, .jpeg, .png)
image_extensions = ["*.jpg", "*.jpeg", "*.png"]
all_images = []
for ext in image_extensions:
    all_images.extend(glob.glob(os.path.join(extract_path, "**", ext), recursive=True))

print(f"📸 Total images found: {len(all_images):,}")
if all_images:
    print("👉 Example:", all_images[0])


In [None]:
# --- 1. Imports ---
import os, glob, shutil, zipfile
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import numpy as np
!pip install lpips


# Metrics
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
import torch.nn.functional as F

try:
    import lpips
    lpips_alex = lpips.LPIPS(net='alex')
    lpips_alex.cuda()
except:
    lpips_alex = None
    print("⚠️ LPIPS not available, install with: pip install lpips")

# --- 2. Force GPU ---
assert torch.cuda.is_available(), "❌ No GPU found! Enable GPU runtime in Colab."
device = torch.device("cuda")
print(f"🔥 Using device: {device}")
print("GPU Name:", torch.cuda.get_device_name(0))

# --- 3. Dataset paths ---
zip_path = "/content/drive/MyDrive/Dataset Conf/archive.zip"
extract_path = "/content/local_dataset"
processed_path = "/content/processed_dataset"

# --- 4. Extract dataset ---
if not os.path.exists(extract_path):
    print("⏳ Extracting dataset...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("✅ Extraction done")
else:
    print("⚡ Already extracted")

# --- 5. Collect images ---
image_extensions = ["*.jpg", "*.jpeg", "*.png"]
all_images = []
for ext in image_extensions:
    all_images.extend(glob.glob(os.path.join(extract_path, "**", ext), recursive=True))

print(f"📸 Total images collected: {len(all_images):,}")
if not all_images:
    raise RuntimeError("❌ No images found!")

# --- 6. Split into train/val/test ---
train_imgs, temp_imgs = train_test_split(all_images, test_size=0.2, random_state=42)
val_imgs, test_imgs = train_test_split(temp_imgs, test_size=0.5, random_state=42)

print(f"📂 Train: {len(train_imgs):,}, Val: {len(val_imgs):,}, Test: {len(test_imgs):,}")

# --- 7. Organize dataset ---
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(processed_path, split), exist_ok=True)

def copy_images(file_list, destination):
    for f in file_list:
        shutil.copy(f, destination)

copy_images(train_imgs, os.path.join(processed_path, "train"))
copy_images(val_imgs, os.path.join(processed_path, "val"))
copy_images(test_imgs, os.path.join(processed_path, "test"))
print("✅ Dataset ready")

# --- 8. Dataset class ---
class ImageDataset(Dataset):
    def __init__(self, folder_path, transform=None):
        self.image_paths = sorted(glob.glob(os.path.join(folder_path, "*")))
        self.transform = transform
    def __len__(self):
        return len(self.image_paths)
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

# --- 9. Data loaders ---
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 64

transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
])

train_dataset = ImageDataset(os.path.join(processed_path, "train"), transform=transform)
val_dataset = ImageDataset(os.path.join(processed_path, "val"), transform=transform)
test_dataset = ImageDataset(os.path.join(processed_path, "test"), transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                        num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                         num_workers=4, pin_memory=True)

print(f"✅ Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}")

# --- 10. Autoencoder model ---
class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1), nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1), nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1), nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.decoder(self.encoder(x))

# --- 11. Init ---
model = ConvAutoencoder().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)

# --- 12. Training ---
NUM_EPOCHS = 20
train_losses, val_losses = [], []

print("\n🚀 Training started...")
for epoch in range(NUM_EPOCHS):
    # Training
    model.train()
    running_loss = 0.0
    for images in tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Train]"):
        images = images.to(device, non_blocking=True)
        outputs = model(images)
        loss = criterion(outputs, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    epoch_train_loss = running_loss / len(train_loader)
    train_losses.append(epoch_train_loss)

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images in val_loader:
            images = images.to(device, non_blocking=True)
            outputs = model(images)
            loss = criterion(outputs, images)
            val_loss += loss.item()
    epoch_val_loss = val_loss / len(val_loader)
    val_losses.append(epoch_val_loss)

    print(f"📊 Epoch [{epoch+1}/{NUM_EPOCHS}] | Train Loss: {epoch_train_loss:.6f} | Val Loss: {epoch_val_loss:.6f}")

    # Save models
    torch.save(model.state_dict(), "/content/autoencoder_last.pth")
    if epoch == 0 or epoch_val_loss <= min(val_losses):
        torch.save(model.state_dict(), "/content/autoencoder_best.pth")

print("✅ Training finished!")

# --- 13. Loss curves ---
plt.figure(figsize=(10,5))
plt.plot(range(1, NUM_EPOCHS+1), train_losses, label="Train Loss", marker='o')
plt.plot(range(1, NUM_EPOCHS+1), val_losses, label="Val Loss", marker='o')
plt.xlabel("Epochs"); plt.ylabel("MSE Loss")
plt.title("Training vs Validation Loss (Tesla T4 GPU)")
plt.legend(); plt.grid(True)
plt.show()

# --- 14. Evaluation on test set ---
model.eval()
test_loss, psnr_scores, ssim_scores, mae_scores, lpips_scores = 0.0, [], [], [], []
with torch.no_grad():
    for images in tqdm(test_loader, desc="Evaluating on Test Set"):
        images = images.to(device, non_blocking=True)
        outputs = model(images)
        loss = criterion(outputs, images)
        test_loss += loss.item()

        # Metrics
        for i in range(images.size(0)):
            orig = images[i].cpu().permute(1,2,0).numpy()
            recon = outputs[i].cpu().permute(1,2,0).numpy()
            psnr_scores.append(psnr(orig, recon, data_range=1.0))
            ssim_scores.append(ssim(orig, recon, channel_axis=2, data_range=1.0))
            mae_scores.append(np.mean(np.abs(orig - recon)))
            if lpips_alex:
                score = lpips_alex(images[i].unsqueeze(0), outputs[i].unsqueeze(0)).item()
                lpips_scores.append(score)

avg_test_loss = test_loss / len(test_loader)
print(f"\n✅ Test Results:")
print(f"   Test Loss: {avg_test_loss:.6f}")
print(f"   PSNR: {np.mean(psnr_scores):.3f}")
print(f"   SSIM: {np.mean(ssim_scores):.3f}")
print(f"   MAE: {np.mean(mae_scores):.6f}")
if lpips_scores:
    print(f"   LPIPS: {np.mean(lpips_scores):.6f}")

# --- 15. Error heatmaps (first 3 samples) ---
test_images = next(iter(test_loader)).to(device)
with torch.no_grad():
    reconstructed = model(test_images)

plt.figure(figsize=(15,6))
for i in range(3):
    orig = test_images[i].cpu().permute(1,2,0).numpy()
    recon = reconstructed[i].cpu().permute(1,2,0).numpy()
    error = np.abs(orig - recon)

    plt.subplot(3,3,i*3+1); plt.imshow(orig); plt.title("Original"); plt.axis("off")
    plt.subplot(3,3,i*3+2); plt.imshow(recon); plt.title("Reconstructed"); plt.axis("off")
    plt.subplot(3,3,i*3+3); plt.imshow(error, cmap="hot"); plt.title("Error Heatmap"); plt.axis("off")
plt.suptitle("Reconstruction Quality on Test Images", fontsize=16)
plt.show()


In [None]:
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
import numpy as np

# --- Initialize storage for curves ---
psnr_epoch_scores, ssim_epoch_scores = [], []

print("\n🚀 Training with PSNR/SSIM tracking...")
for epoch in range(NUM_EPOCHS):
    # --- Training ---
    model.train()
    running_loss = 0.0
    for images in tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Train]"):
        images = images.to(device, non_blocking=True)
        outputs = model(images)
        loss = criterion(outputs, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    epoch_train_loss = running_loss / len(train_loader)
    train_losses.append(epoch_train_loss)

    # --- Validation (with PSNR + SSIM) ---
    model.eval()
    val_loss, val_psnr, val_ssim, count = 0.0, 0.0, 0.0, 0
    with torch.no_grad():
        for images in val_loader:
            images = images.to(device, non_blocking=True)
            outputs = model(images)
            loss = criterion(outputs, images)
            val_loss += loss.item()

            imgs_np = images.cpu().numpy().transpose(0,2,3,1)
            outs_np = outputs.cpu().numpy().transpose(0,2,3,1)

            for i in range(len(imgs_np)):
                val_psnr += psnr(imgs_np[i], outs_np[i], data_range=1.0)
                val_ssim += ssim(imgs_np[i], outs_np[i], channel_axis=-1, data_range=1.0)
                count += 1

    epoch_val_loss = val_loss / len(val_loader)
    val_losses.append(epoch_val_loss)
    psnr_epoch_scores.append(val_psnr / count)
    ssim_epoch_scores.append(val_ssim / count)

    print(f"📊 Epoch [{epoch+1}/{NUM_EPOCHS}] | "
          f"Train Loss: {epoch_train_loss:.6f} | "
          f"Val Loss: {epoch_val_loss:.6f} | "
          f"PSNR: {psnr_epoch_scores[-1]:.2f} | "
          f"SSIM: {ssim_epoch_scores[-1]:.4f}")

print("✅ Training + PSNR/SSIM tracking finished!")

# --- Plot PSNR & SSIM curves ---
plt.figure(figsize=(10,5))
plt.plot(range(1, NUM_EPOCHS+1), psnr_epoch_scores, label="PSNR", marker='o')
plt.plot(range(1, NUM_EPOCHS+1), ssim_epoch_scores, label="SSIM", marker='s')
plt.xlabel("Epochs"); plt.ylabel("Score")
plt.title("Reconstruction Quality over Epochs (Tesla T4 GPU)")
plt.legend(); plt.grid(True)
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# --- Data extracted from your logs ---
epochs = list(range(1, 21))

train_losses = [0.000744,0.000267,0.000238,0.000223,0.000210,0.000205,0.000202,0.000199,0.000198,0.000196,
                0.000195,0.000194,0.000193,0.000193,0.000192,0.000191,0.000191,0.000190,0.000190,0.000189]

val_losses =   [0.000298,0.000219,0.000252,0.000188,0.000199,0.000183,0.000177,0.000179,0.000182,0.000183,
                0.000176,0.000183,0.000210,0.000185,0.000178,0.000175,0.000178,0.000173,0.000183,0.000180]

psnr_scores = [38.25,38.24,38.14,37.76,38.24,38.19,38.04,36.34,36.17,38.35,
               38.08,38.20,37.62,36.39,37.84,37.40,38.45,37.87,37.94,38.22]

ssim_scores = [0.9666,0.9659,0.9658,0.9665,0.9668,0.9677,0.9663,0.9552,0.9606,0.9676,
               0.9672,0.9675,0.9670,0.9605,0.9660,0.9650,0.9666,0.9652,0.9675,0.9679]

# --- 1. Train vs Validation Loss ---
plt.figure(figsize=(8,5))
plt.plot(epochs, train_losses, label="Train Loss", marker='o')
plt.plot(epochs, val_losses, label="Validation Loss", marker='s')
best_epoch = np.argmin(val_losses)+1
plt.scatter(best_epoch, val_losses[best_epoch-1], color='red', s=80, label=f"Best Val (Epoch {best_epoch})")
plt.xlabel("Epochs", fontsize=12); plt.ylabel("MSE Loss", fontsize=12)
plt.title("Training vs Validation Loss", fontsize=14, fontweight='bold')
plt.legend(); plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

# --- 2. PSNR across epochs ---
plt.figure(figsize=(8,5))
plt.plot(epochs, psnr_scores, label="PSNR", marker='o', color='green')
best_epoch = np.argmax(psnr_scores)+1
plt.scatter(best_epoch, psnr_scores[best_epoch-1], color='red', s=80, label=f"Best PSNR (Epoch {best_epoch})")
plt.xlabel("Epochs", fontsize=12); plt.ylabel("PSNR (dB)", fontsize=12)
plt.title("Peak Signal-to-Noise Ratio (PSNR)", fontsize=14, fontweight='bold')
plt.legend(); plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

# --- 3. SSIM across epochs ---
plt.figure(figsize=(8,5))
plt.plot(epochs, ssim_scores, label="SSIM", marker='s', color='purple')
best_epoch = np.argmax(ssim_scores)+1
plt.scatter(best_epoch, ssim_scores[best_epoch-1], color='red', s=80, label=f"Best SSIM (Epoch {best_epoch})")
plt.xlabel("Epochs", fontsize=12); plt.ylabel("SSIM", fontsize=12)
plt.title("Structural Similarity Index (SSIM)", fontsize=14, fontweight='bold')
plt.legend(); plt.grid(True, linestyle='--', alpha=0.6)
plt.show()
