# Video Filters Workshop

Implement 2D convolution, then use it to apply filters to video!

In [None]:
!pip install triton opencv-python-headless

Collecting triton
  Downloading triton-3.6.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (1.7 kB)
Downloading triton-3.6.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (188.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m188.3/188.3 MB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: triton


In [None]:
# imports

# Part 1: 2D Convolution

Let $r_y = \frac{K_H - 1}{2}$ and $r_x = \frac{K_W - 1}{2}$. Out-of-bounds accesses to $A$ are treated as zero.

$$C[y, x] = \sum_{i=0}^{K_H-1} \sum_{j=0}^{K_W-1} A[y + i - r_y, x + j - r_x] \cdot B[i, j]$$

*Hint: if you're having trouble with arbitrary sizes, try hardcoding 3x3 kernels first.*

In [None]:
@triton.jit
def conv2d_kernel(
    input_ptr, kernel_ptr, output_ptr,
    H, W, KH, KW,
    BLOCK_H: tl.constexpr, BLOCK_W: tl.constexpr
):
    # Your kernel here!
    pass

In [None]:
def conv2d(image: torch.Tensor, kernel: torch.Tensor) -> torch.Tensor:
    """Apply convolution to a 2D image. Returns output of same size (with padding)."""
    H, W = image.shape
    KH, KW = kernel.shape
    output = torch.empty_like(image)
    
    BLOCK_H, BLOCK_W = 16, 16
    grid = (triton.cdiv(H, BLOCK_H), triton.cdiv(W, BLOCK_W))
    
    conv2d_kernel[grid](
        image, kernel, output,
        H, W, KH, KW,
        BLOCK_H=BLOCK_H, BLOCK_W=BLOCK_W
    )
    return output

## Testing your impl against torch

In [None]:
import torch.nn.functional as F
import matplotlib.pyplot as plt

H, W = 256, 256
test_image = torch.randn((H, W), device='cuda', dtype=torch.float32)
kernel = torch.ones((3, 3), device='cuda', dtype=torch.float32) / 9.0  # try other kernels!

your_output = conv2d(test_image, kernel)

expected = F.conv2d(
    test_image.unsqueeze(0).unsqueeze(0),
    kernel.unsqueeze(0).unsqueeze(0),
    padding=kernel.shape[0] // 2
).squeeze()

if torch.allclose(your_output, expected, atol=1e-4):
    print("Yay!")
else:
    print(f"Max diff: {(your_output - expected).abs().max().item()}")

fig, axes = plt.subplots(1, 3, figsize=(12, 4))
axes[0].imshow(test_image.cpu().numpy(), cmap='gray')
axes[0].set_title("Input")
axes[1].imshow(your_output.cpu().numpy(), cmap='gray')
axes[1].set_title("Your Output")
axes[2].imshow(expected.cpu().numpy(), cmap='gray')
axes[2].set_title("Expected")
for ax in axes: ax.axis('off')
plt.show()

# Part 2: Video Filters

Now we'll apply these convolutions to each frame in a video!

In [None]:
import cv2
import numpy as np
import os
import urllib.request
from IPython.display import Video, display

VIDEOS = {
    "bunny.mp4": "https://raw.githubusercontent.com/kartva/gpu_workshop/main/learning/videos/bunny.mp4",
    "jellyfish.mp4": "https://raw.githubusercontent.com/kartva/gpu_workshop/main/learning/videos/jellyfish.mp4"
}

for name, url in VIDEOS.items():
    if not os.path.exists(name):
        print(f"Downloading {name}...")
        urllib.request.urlretrieve(url, name)
print("Done!")

In [None]:
import subprocess

def apply_filter_to_video(input_path: str, output_path: str, kernel_r: torch.Tensor, kernel_g: torch.Tensor, kernel_b: torch.Tensor):
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Write to temp file first, then re-encode for browser compatibility
    temp_path = output_path.replace('.mp4', '_temp.mp4')
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(temp_path, fourcc, fps, (width, height), isColor=True)

    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # BGR -> separate channels
        frame_float = frame.astype(np.float32) / 255.0
        b = torch.from_numpy(frame_float[:, :, 0]).cuda()
        g = torch.from_numpy(frame_float[:, :, 1]).cuda()
        r = torch.from_numpy(frame_float[:, :, 2]).cuda()

        # Apply convolution to each channel
        r_out = conv2d(r, kernel_r)
        g_out = conv2d(g, kernel_g)
        b_out = conv2d(b, kernel_b)

        # Recombine
        out_frame = np.stack([
            b_out.cpu().numpy().clip(0, 1),
            g_out.cpu().numpy().clip(0, 1),
            r_out.cpu().numpy().clip(0, 1)
        ], axis=-1)
        out.write((out_frame * 255).astype(np.uint8))

        frame_count += 1
        if frame_count % 30 == 0:
            print(f"Processed {frame_count} frames...")

    cap.release()
    out.release()
    
    # Re-encode with H.264 for browser compatibility
    print("Re-encoding for browser playback...")
    subprocess.run(['ffmpeg', '-y', '-i', temp_path, '-c:v', 'libx264', '-preset', 'fast', output_path], 
                   capture_output=True)
    os.remove(temp_path)
    print(f"Saved to {output_path}")

In [None]:
# Define kernels for each channel - try different ones!
kernel_r = torch.ones((3, 3), device='cuda', dtype=torch.float32) / 9.0
kernel_g = torch.ones((3, 3), device='cuda', dtype=torch.float32) / 9.0
kernel_b = torch.ones((3, 3), device='cuda', dtype=torch.float32) / 9.0

apply_filter_to_video("bunny.mp4", "bunny_filtered.mp4", kernel_r, kernel_g, kernel_b)

In [None]:
display(Video("bunny_filtered.mp4", embed=True, width=640))