In [11]:
import os
import glob
import random
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from skimage.metrics import peak_signal_noise_ratio as compare_psnr
from skimage.metrics import structural_similarity as compare_ssim

In [12]:
drive_results_path = "results_4"
os.makedirs(drive_results_path, exist_ok=True)

In [13]:
import os
print("Current Working Directory:", os.getcwd())

Current Working Directory: C:\Users\ipg 3\IOP2025


In [14]:
import os, glob

hazy_dir = "hazy"
clean_dir = "clear"

hazy_paths = sorted(glob.glob(os.path.join(hazy_dir, "*.*")))
clean_paths = sorted(glob.glob(os.path.join(clean_dir, "*.*")))

print("Hazy images found:", len(hazy_paths))
print("Clean images found:", len(clean_paths))


Hazy images found: 547
Clean images found: 547


In [15]:
# Sequence-paired
hazy_paths = sorted(glob.glob(os.path.join(hazy_dir, "*")))
clean_paths = sorted(glob.glob(os.path.join(clean_dir, "*")))
assert len(hazy_paths) == len(clean_paths), "Mismatched hazy and clean images"

pairs = list(zip(hazy_paths, clean_paths))
random.shuffle(pairs)
split = int(0.8 * len(pairs))
train_pairs = pairs[:split]
test_pairs = pairs[split:]

In [16]:
class DehazingDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs
        self.transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor()
        ])
    def __len__(self): return len(self.pairs)
    def __getitem__(self, idx):
        hazy_path, clean_path = self.pairs[idx]
        hazy = Image.open(hazy_path).convert("RGB")
        clean = Image.open(clean_path).convert("RGB")
        return self.transform(hazy), self.transform(clean), hazy_path, clean_path

train_loader = DataLoader(DehazingDataset(train_pairs), batch_size=4, shuffle=True)
test_loader = DataLoader(DehazingDataset(test_pairs), batch_size=1, shuffle=False)

In [17]:
class DeepDehazeNet(nn.Module):
    def __init__(self):
        super().__init__()

        def conv_block(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, 3, padding=1),
                nn.BatchNorm2d(out_c),
                nn.ReLU(inplace=True),
                nn.Dropout(0.2)
            )

        self.enc1 = conv_block(3, 64)
        self.pool1 = nn.MaxPool2d(2)
        self.bottleneck = conv_block(64, 128)
        self.up1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.dec1 = conv_block(128, 64)
        self.final = nn.Conv2d(64, 3, 1)

    def forward(self, x):
        e1 = self.enc1(x)
        b = self.bottleneck(self.pool1(e1))
        d1 = self.dec1(torch.cat([self.up1(b), e1], 1))
        return torch.sigmoid(self.final(d1))

In [None]:
import os
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
from torchvision import transforms
from PIL import Image
from skimage.metrics import peak_signal_noise_ratio as compare_psnr

# === SETUP ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DeepDehazeNet().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# === RESULT DIRECTORY ===
drive_results_path = "results_4"
os.makedirs(drive_results_path, exist_ok=True)

# === TRAINING SETUP ===
train_losses, test_losses, psnrs ,ssims = [], [], [], []
best_psnr, overfit_epoch = 0, 0
early_stop_counter = 0
patience = 20
num_epochs = 200

for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    for hazy, clean, *_ in train_loader:
        hazy, clean = hazy.to(device), clean.to(device)
        optimizer.zero_grad()
        output = model(hazy)
        loss = criterion(output, clean)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    train_loss = total_train_loss / len(train_loader)
    train_losses.append(train_loss)

    # Evaluate on test
    model.eval()
    total_test_loss = 0
    total_psnr = 0
    total_ssim =0
    with torch.no_grad():
        for hazy, clean, *_ in test_loader:
            hazy, clean = hazy.to(device), clean.to(device)
            output = model(hazy)
            total_test_loss += criterion(output, clean).item()
            psnr = compare_psnr(clean.cpu().numpy(), output.cpu().numpy(), data_range=1.0)
            ssim = compare_ssim(
                clean.cpu().squeeze().permute(1,2,0).numpy(),
                output.cpu().squeeze().permute(1,2,0).numpy(),
                channel_axis=-1, data_range=1.0
            )
            total_psnr += psnr
            total_ssim += ssim
    test_loss = total_test_loss / len(test_loader)
    psnr_avg = total_psnr / len(test_loader)
    avg_ssim = total_ssim / len(test_loader)
    test_losses.append(test_loss)
    psnrs.append(psnr_avg)
    ssims.append(avg_ssim)

    if psnr_avg > best_psnr:
        best_psnr = psnr_avg
        overfit_epoch = epoch
        torch.save(model.state_dict(),"best_model_4_4.pth")
        early_stop_counter = 0
    else:
        early_stop_counter += 1

    print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Test Loss={test_loss:.4f}, PSNR={psnr_avg:.2f}, SSIM={avg_ssim:.2f}")
    if early_stop_counter >= patience:
        print(f"Early stopping at epoch {epoch+1}")
        break

# Final model
torch.save(model.state_dict(),"final_model_4_4.pth")

# === LOSS PLOT ===
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss' ,alpha=0.7)
plt.plot(test_losses, label='Test Loss' ,alpha=0.7)
plt.axvline(overfit_epoch, color='red', linestyle='--', label=f'Overfit @ {overfit_epoch}')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss vs Epoch")
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(drive_results_path, "loss_plot.png"))
plt.show()

# === PSNR PLOT ===
plt.figure(figsize=(10, 5))
plt.plot(psnrs, label='Test PSNR', color='green')
plt.axvline(overfit_epoch, color='red', linestyle='--', label=f'Best Epoch: {overfit_epoch}')
plt.xlabel("Epoch")
plt.ylabel("PSNR (dB)")
plt.title("PSNR vs Epoch")
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(drive_results_path, "psnr_plot.png"))
plt.show()

# ===Plot SSIM===
plt.figure(figsize=(10, 5))
plt.plot(ssims, label='Test SSIM', color='purple')
plt.axvline(overfit_epoch, color='red', linestyle='--', label=f'Best Epoch: {overfit_epoch}')
plt.xlabel("Epoch")
plt.ylabel("SSIM")
plt.title("Test SSIM vs Epoch")
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(drive_results_path, "ssim_plot.png"))
plt.show()

# === COMPARISON TABLE ===

model.load_state_dict(torch.load("best_model_4_4.pth"))
model.eval()

num_samples = len(test_loader)
fig, axes = plt.subplots(num_samples, 3, figsize=(12, 3 * num_samples))
if num_samples == 1:
    axes = [axes]

with torch.no_grad():
    for i, (hazy, clean, hazy_path, clean_path) in enumerate(test_loader):
        hazy, clean = hazy.to(device), clean.to(device)
        output = model(hazy)

        # Convert tensors to image format
        dehazed_img_arr = output.squeeze().cpu().numpy().transpose(1, 2, 0)
        dehazed_img_arr = (dehazed_img_arr * 255).clip(0, 255).astype(np.uint8)
        dehazed_img = Image.fromarray(dehazed_img_arr).resize((256, 256))

        hazy_img = Image.open(hazy_path[0]).resize((256, 256))
        clean_img = Image.open(clean_path[0]).resize((256, 256))

        # Compute metrics
        psnr = compare_psnr(clean.cpu().numpy(), output.cpu().numpy(), data_range=1.0)
        ssim = compare_ssim(
            clean.cpu().squeeze().permute(1, 2, 0).numpy(),
            output.cpu().squeeze().permute(1, 2, 0).numpy(),
            channel_axis=-1,
            data_range=1.0
        )

        # Plot images
        for ax, img, title in zip(
            axes[i],
            [hazy_img, clean_img, dehazed_img],
            [f"Hazy {i}", f"Clean {i}", f"Dehazed {i}\nPSNR: {psnr:.2f}\nSSIM: {ssim:.4f}"]
        ):
            ax.imshow(img)
            ax.set_title(title)
            ax.axis('off')

plt.tight_layout()
plt.savefig(os.path.join(drive_results_path, "comparison_table.png"))
plt.show()

print("✅ Table saved to:", os.path.join(drive_results_path, "comparison_table.png"))


Epoch 1: Train Loss=0.0218, Test Loss=0.0135, PSNR=20.80, SSIM=0.77
Epoch 2: Train Loss=0.0148, Test Loss=0.0121, PSNR=21.13, SSIM=0.82
Epoch 3: Train Loss=0.0137, Test Loss=0.0114, PSNR=21.77, SSIM=0.84
