# Video Super Resolution
### Classical Upsampling vs. SRCNN (Deep Learning) — Frame-by-Frame Pipeline

This notebook builds a complete video super resolution pipeline:

1. **Data pipeline** — downsample frames as LR input, HR as ground truth
2. **Classical baselines** — nearest neighbour, bicubic, Lanczos
3. **SRCNN** — Super Resolution CNN in PyTorch (Dong et al., 2014)
4. **Quantitative evaluation** — PSNR & SSIM across all methods
5. **Video processing** — frame-by-frame inference, video reconstruction

---
**Dependencies:**
```
pip install torch torchvision numpy scipy matplotlib scikit-image opencv-python Pillow
```

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from skimage import data as skdata, color, transform
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
from skimage.transform import resize, rescale
import cv2
import warnings
warnings.filterwarnings('ignore')

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {DEVICE}')
print(f'PyTorch: {torch.__version__}')

---
## 1 · Data Pipeline

For super resolution:
- **HR** (high-res) = original image — ground truth
- **LR** (low-res) = HR downsampled by scale factor, then upsampled back to HR size via bicubic
- **SRCNN input** = bicubic-upsampled LR image (same size as HR)
- **SRCNN output** = refined HR prediction

```
HR → downsample → LR → bicubic upsample → SRCNN → SR (≈ HR)
```

In [None]:
# ── Parameters ────────────────────────────────────────────────────────────────
SCALE_FACTOR  = 3       # ← tune: 2, 3, or 4 (upscaling factor)
PATCH_SIZE    = 33      # ← tune: SRCNN input patch size (must be odd)
STRIDE        = 14      # ← tune: patch extraction stride
BATCH_SIZE    = 64
NUM_EPOCHS    = 30      # ← tune: increase for better results
LR            = 1e-4

# ── Load clean HR frames ──────────────────────────────────────────────────────
def load_hr_frames(size=256):
    loaders = [
        skdata.astronaut, skdata.camera, skdata.coins, skdata.horse,
        skdata.hubble_deep_field, skdata.moon, skdata.page, skdata.text,
        skdata.chelsea, skdata.coffee, skdata.immunohistochemistry, skdata.rocket
    ]
    frames = []
    for loader in loaders:
        img = loader()
        if img.ndim == 3:
            img = color.rgb2gray(img)
        img = resize(img, (size, size), anti_aliasing=True).astype(np.float32)
        frames.append(img)
    return frames

def make_lr_bicubic(hr, scale):
    """Downsample HR then upsample back to HR size via bicubic."""
    lr = rescale(hr, 1.0 / scale, anti_aliasing=True, order=3)
    lr_up = resize(lr, hr.shape, order=3, anti_aliasing=False).astype(np.float32)
    return lr_up

hr_frames = load_hr_frames(size=252)   # 252 divisible by 3 and 4
lr_frames = [make_lr_bicubic(f, SCALE_FACTOR) for f in hr_frames]

print(f'Loaded {len(hr_frames)} frame pairs — HR shape: {hr_frames[0].shape}')

# Preview
fig, axes = plt.subplots(2, 6, figsize=(18, 6))
for i in range(6):
    axes[0, i].imshow(hr_frames[i], cmap='gray'); axes[0, i].set_title(f'HR frame {i+1}'); axes[0, i].axis('off')
    axes[1, i].imshow(lr_frames[i], cmap='gray'); axes[1, i].set_title(f'LR×{SCALE_FACTOR} frame {i+1}'); axes[1, i].axis('off')
fig.suptitle('HR vs. Bicubic-upsampled LR Input', fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
class SRDataset(Dataset):
    """
    Extracts overlapping patches from (LR_bicubic, HR) pairs.
    SRCNN is trained on patches to keep memory manageable.
    """
    def __init__(self, lr_frames, hr_frames, patch_size=33, stride=14):
        self.lr_patches = []
        self.hr_patches = []

        for lr, hr in zip(lr_frames, hr_frames):
            H, W = hr.shape
            for r in range(0, H - patch_size + 1, stride):
                for c in range(0, W - patch_size + 1, stride):
                    self.lr_patches.append(lr[r:r+patch_size, c:c+patch_size])
                    self.hr_patches.append(hr[r:r+patch_size, c:c+patch_size])

    def __len__(self):
        return len(self.lr_patches)

    def __getitem__(self, idx):
        lr = torch.from_numpy(self.lr_patches[idx]).unsqueeze(0)  # (1, H, W)
        hr = torch.from_numpy(self.hr_patches[idx]).unsqueeze(0)
        return lr, hr

# ── Train / val / test split ──────────────────────────────────────────────────
train_lr, train_hr = lr_frames[:8],   hr_frames[:8]
val_lr,   val_hr   = lr_frames[8:10], hr_frames[8:10]
test_lr,  test_hr  = lr_frames[10:],  hr_frames[10:]

train_ds = SRDataset(train_lr, train_hr, PATCH_SIZE, STRIDE)
val_ds   = SRDataset(val_lr,   val_hr,   PATCH_SIZE, STRIDE)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

print(f'Train patches: {len(train_ds):,} | Val patches: {len(val_ds):,}')

---
## 2 · Classical Upsampling Baselines

| Method | Order | Characteristic |
|---|---|---|
| Nearest neighbour | 0 | Fastest; blocky artefacts |
| Bilinear | 1 | Smooth but blurry |
| Bicubic | 3 | Standard baseline for SR |
| Lanczos | 5 | Sharpest classical method |

In [None]:
def classical_upsample(hr, scale):
    """Downsample HR, then upsample with each classical method."""
    lr_small = rescale(hr, 1.0 / scale, anti_aliasing=True, order=3)
    target   = hr.shape
    return {
        'Nearest':  resize(lr_small, target, order=0).astype(np.float32),
        'Bilinear': resize(lr_small, target, order=1).astype(np.float32),
        'Bicubic':  resize(lr_small, target, order=3).astype(np.float32),
        'Lanczos':  resize(lr_small, target, order=5).astype(np.float32),
    }

def compute_metrics(ref, candidate):
    p = psnr(ref, candidate, data_range=1.0)
    s = ssim(ref, candidate, data_range=1.0)
    return p, s

# Evaluate on first test frame
hr_test  = test_hr[0]
upscaled = classical_upsample(hr_test, SCALE_FACTOR)

print(f'=== Classical Baselines (scale ×{SCALE_FACTOR}) ===')
print(f'{"Method":<14} {"PSNR (dB)":>10} {"SSIM":>8}')
print('-' * 36)
for name, img in upscaled.items():
    p, s = compute_metrics(hr_test, img)
    print(f'{name:<14} {p:>10.2f} {s:>8.4f}')

# Visualize
imgs   = [hr_test, *upscaled.values()]
titles = ['HR (ground truth)', *upscaled.keys()]
fig, axes = plt.subplots(1, 5, figsize=(20, 4))
for ax, img, title in zip(axes, imgs, titles):
    ax.imshow(img, cmap='gray')
    ax.set_title(title)
    ax.axis('off')
fig.suptitle(f'Classical Upsampling Baselines (×{SCALE_FACTOR})', fontweight='bold')
plt.tight_layout()
plt.show()

---
## 3 · SRCNN — Super Resolution CNN

**SRCNN** (Dong et al., 2014) is the pioneering CNN for image super resolution.
It maps a bicubic-upsampled LR image directly to HR using three conv layers:

| Layer | Kernel | Filters | Role |
|---|---|---|---|
| Conv1 | 9×9 | 64 | Patch extraction & representation |
| Conv2 | 1×1 | 32 | Non-linear mapping |
| Conv3 | 5×5 | 1  | Reconstruction |

In [None]:
class SRCNN(nn.Module):
    """SRCNN: Learning a Deep Convolutional Network for Image Super-Resolution.
    Dong et al., ECCV 2014 / IEEE TPAMI 2015.
    """
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=9, padding=4)   # patch extraction
        self.conv2 = nn.Conv2d(64, 32, kernel_size=1, padding=0)  # non-linear mapping
        self.conv3 = nn.Conv2d(32, 1,  kernel_size=5, padding=2)  # reconstruction
        self.relu  = nn.ReLU(inplace=True)
        self._init_weights()

    def _init_weights(self):
        for layer in [self.conv1, self.conv2]:
            nn.init.normal_(layer.weight, mean=0, std=0.001)
            nn.init.zeros_(layer.bias)
        nn.init.normal_(self.conv3.weight, mean=0, std=0.001)
        nn.init.zeros_(self.conv3.bias)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.conv3(x)
        return x

model = SRCNN().to(DEVICE)
total_params = sum(p.numel() for p in model.parameters())
print(f'SRCNN parameters: {total_params:,}')
print(model)

---
## 4 · Training Pipeline

In [None]:
criterion = nn.MSELoss()
# SRCNN paper uses different LRs per layer
optimizer = optim.Adam([
    {'params': model.conv1.parameters(), 'lr': LR},
    {'params': model.conv2.parameters(), 'lr': LR},
    {'params': model.conv3.parameters(), 'lr': LR * 0.1},  # smaller LR for last layer
])
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.5)

train_losses, val_losses = [], []
best_val_loss = float('inf')

for epoch in range(NUM_EPOCHS):
    # ── Train ──────────────────────────────────────────────────────────────────
    model.train()
    t_loss = 0.0
    for lr_patch, hr_patch in train_loader:
        lr_patch, hr_patch = lr_patch.to(DEVICE), hr_patch.to(DEVICE)
        optimizer.zero_grad()
        sr_patch = model(lr_patch)
        loss = criterion(sr_patch, hr_patch)
        loss.backward()
        optimizer.step()
        t_loss += loss.item()

    # ── Validate ───────────────────────────────────────────────────────────────
    model.eval()
    v_loss = 0.0
    with torch.no_grad():
        for lr_patch, hr_patch in val_loader:
            lr_patch, hr_patch = lr_patch.to(DEVICE), hr_patch.to(DEVICE)
            sr_patch = model(lr_patch)
            v_loss += criterion(sr_patch, hr_patch).item()

    t_loss /= len(train_loader)
    v_loss /= len(val_loader)
    train_losses.append(t_loss)
    val_losses.append(v_loss)
    scheduler.step()

    # Save best model
    if v_loss < best_val_loss:
        best_val_loss = v_loss
        torch.save(model.state_dict(), 'srcnn_best.pth')

    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch+1:>3}/{NUM_EPOCHS}]  Train: {t_loss:.6f}  Val: {v_loss:.6f}  LR: {scheduler.get_last_lr()[0]:.2e}')

# ── Load best weights & plot ───────────────────────────────────────────────────
model.load_state_dict(torch.load('srcnn_best.pth', map_location=DEVICE))

plt.figure(figsize=(8, 4))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses,   label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.title('SRCNN Training Curves')
plt.legend()
plt.tight_layout()
plt.show()
print(f'Best val loss: {best_val_loss:.6f}')

---
## 5 · Evaluation — Classical vs. SRCNN

In [None]:
def srcnn_upscale(model, lr_bicubic):
    """Run SRCNN inference on a full bicubic-upsampled frame."""
    model.eval()
    with torch.no_grad():
        t = torch.from_numpy(lr_bicubic).unsqueeze(0).unsqueeze(0).to(DEVICE)
        out = model(t).squeeze().cpu().numpy()
    return np.clip(out, 0, 1).astype(np.float32)

print(f'=== Super Resolution Benchmark (×{SCALE_FACTOR}) ===')
print(f'{"Method":<14} {"PSNR (dB)":>10} {"SSIM":>8}')
print('-' * 36)

all_results = {}
for hr, lr_bic in zip(test_hr, test_lr):
    methods = {**classical_upsample(hr, SCALE_FACTOR), 'SRCNN': srcnn_upscale(model, lr_bic)}
    for name, img in methods.items():
        p, s = compute_metrics(hr, img)
        all_results.setdefault(name, []).append((p, s))

for name, scores in all_results.items():
    avg_p = np.mean([s[0] for s in scores])
    avg_s = np.mean([s[1] for s in scores])
    marker = ' ◀ best' if name == 'SRCNN' else ''
    print(f'{name:<14} {avg_p:>10.2f} {avg_s:>8.4f}{marker}')

In [None]:
# ── Visual comparison ─────────────────────────────────────────────────────────
hr_test  = test_hr[0]
lr_bic   = test_lr[0]
upscaled = classical_upsample(hr_test, SCALE_FACTOR)
sr_out   = srcnn_upscale(model, lr_bic)

imgs   = [hr_test, upscaled['Nearest'], upscaled['Bicubic'], upscaled['Lanczos'], sr_out]
titles = ['HR (ground truth)', 'Nearest', 'Bicubic', 'Lanczos', 'SRCNN']

fig, axes = plt.subplots(1, 5, figsize=(20, 4))
for ax, img, title in zip(axes, imgs, titles):
    p, s = compute_metrics(hr_test, img)
    ax.imshow(img, cmap='gray')
    ax.set_title(f'{title}\nPSNR={p:.1f} dB', fontsize=9)
    ax.axis('off')
fig.suptitle(f'Classical vs. SRCNN Super Resolution (×{SCALE_FACTOR})', fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()

# ── Zoomed crop comparison ────────────────────────────────────────────────────
r, c, s = 80, 80, 80   # ← tune: crop region
crops  = [img[r:r+s, c:c+s] for img in imgs]
fig, axes = plt.subplots(1, 5, figsize=(20, 4))
for ax, crop, title in zip(axes, crops, titles):
    ax.imshow(crop, cmap='gray', interpolation='nearest')
    ax.set_title(f'{title} (crop)')
    ax.axis('off')
fig.suptitle('Zoomed Crop — Fine Detail Comparison', fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()

---
## 6 · Video Super Resolution Pipeline

In [None]:
# ── Generate a synthetic LR video ─────────────────────────────────────────────
SYNTHETIC_VIDEO = 'synthetic_lr.avi'
H, W = 84, 84   # LR frame size (will be upscaled to 252×252)
fps  = 10

writer = cv2.VideoWriter(SYNTHETIC_VIDEO, cv2.VideoWriter_fourcc(*'XVID'), fps, (W, H), isColor=False)
for hr in (test_hr * 5):
    lr_small = rescale(hr, 1.0 / SCALE_FACTOR, anti_aliasing=True, order=3)
    uint8 = (lr_small * 255).astype(np.uint8)
    writer.write(uint8)
writer.release()
print(f'LR video saved: {SYNTHETIC_VIDEO}')

# ── Option B: use your own video ──────────────────────────────────────────────
# SYNTHETIC_VIDEO = 'your_video.mp4'

In [None]:
def super_resolve_video(input_path, output_path, model, scale):
    """
    Read LR video, apply SRCNN super resolution, write HR output video.
    """
    cap   = cv2.VideoCapture(input_path)
    fps   = cap.get(cv2.CAP_PROP_FPS)
    W_lr  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H_lr  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    W_hr, H_hr = W_lr * scale, H_lr * scale

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'XVID'), fps, (W_hr, H_hr), isColor=False)

    model.eval()
    for _ in range(total):
        ret, frame = cap.read()
        if not ret:
            break
        gray   = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
        lr_bic = resize(gray, (H_hr, W_hr), order=3, anti_aliasing=False).astype(np.float32)
        sr     = srcnn_upscale(model, lr_bic)
        out.write((sr * 255).astype(np.uint8))

    cap.release()
    out.release()
    print(f'SR video saved: {output_path}  ({W_lr}×{H_lr} → {W_hr}×{H_hr},  {total} frames)')

super_resolve_video(SYNTHETIC_VIDEO, 'sr_output.avi', model, SCALE_FACTOR)

# ── Visualize sample frames ───────────────────────────────────────────────────
cap = cv2.VideoCapture(SYNTHETIC_VIDEO)
sample_lr, sample_sr, sample_hr = [], [], []
for i, hr in enumerate(test_hr[:5]):
    ret, frame = cap.read()
    if not ret: break
    gray   = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
    lr_bic = resize(gray, hr.shape, order=3).astype(np.float32)
    sample_lr.append(lr_bic)
    sample_sr.append(srcnn_upscale(model, lr_bic))
    sample_hr.append(hr)
cap.release()

fig, axes = plt.subplots(3, 5, figsize=(18, 10))
for i in range(5):
    axes[0, i].imshow(sample_hr[i], cmap='gray'); axes[0, i].set_title(f'HR frame {i+1}');       axes[0, i].axis('off')
    axes[1, i].imshow(sample_lr[i], cmap='gray'); axes[1, i].set_title(f'LR bicubic {i+1}');     axes[1, i].axis('off')
    axes[2, i].imshow(sample_sr[i], cmap='gray'); axes[2, i].set_title(f'SRCNN SR {i+1}');       axes[2, i].axis('off')
fig.suptitle('Video Super Resolution — HR / LR / SRCNN', fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()

---
## 7 · Save & Load Model

In [None]:
torch.save(model.state_dict(), 'srcnn_weights.pth')
print('Model saved to srcnn_weights.pth')

# ── Load (example) ────────────────────────────────────────────────────────────
# model_loaded = SRCNN().to(DEVICE)
# model_loaded.load_state_dict(torch.load('srcnn_weights.pth', map_location=DEVICE))
# model_loaded.eval()