# Fast Neural Style Transfer - Setup & Quick Test

This notebook will:
1. Test a pre-trained PyTorch style transfer model
2. Benchmark inference speed
3. Compare quality with current cv::stylization output

**Goal**: Validate that neural style transfer can replace cv::stylization with 30-120x speedup

## 1. Import Dependencies

In [1]:
import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image
import numpy as np
import cv2
import matplotlib.pyplot as plt
import time
from pathlib import Path

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Matplotlib is building the font cache; this may take a moment.


PyTorch version: 2.0.1
CUDA available: False
Using device: cpu


## 2. Download Pre-trained Fast Style Transfer Model

We'll use a pre-trained model from PyTorch Hub or download one directly.

In [4]:
# Define the Fast Style Transfer model architecture
# Based on Johnson et al. "Perceptual Losses for Real-Time Style Transfer"

class ConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride):
        super(ConvLayer, self).__init__()
        padding = kernel_size // 2
        self.reflection_pad = nn.ReflectionPad2d(padding)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride)

    def forward(self, x):
        out = self.reflection_pad(x)
        out = self.conv(out)
        return out

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = ConvLayer(channels, channels, kernel_size=3, stride=1)
        self.in1 = nn.InstanceNorm2d(channels, affine=True)
        self.conv2 = ConvLayer(channels, channels, kernel_size=3, stride=1)
        self.in2 = nn.InstanceNorm2d(channels, affine=True)
        self.relu = nn.ReLU()

    def forward(self, x):
        residual = x
        out = self.relu(self.in1(self.conv1(x)))
        out = self.in2(self.conv2(out))
        out = out + residual
        return out

class UpsampleConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, upsample=None):
        super(UpsampleConvLayer, self).__init__()
        self.upsample = upsample
        if upsample:
            self.upsample_layer = nn.Upsample(scale_factor=upsample)
        self.reflection_pad = nn.ReflectionPad2d(kernel_size // 2)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride)

    def forward(self, x):
        if self.upsample:
            x = self.upsample_layer(x)
        out = self.reflection_pad(x)
        out = self.conv(out)
        return out

class TransformerNet(nn.Module):
    def __init__(self):
        super(TransformerNet, self).__init__()
        # Encoder
        self.conv1 = ConvLayer(3, 32, kernel_size=9, stride=1)
        self.in1 = nn.InstanceNorm2d(32, affine=True)
        self.conv2 = ConvLayer(32, 64, kernel_size=3, stride=2)
        self.in2 = nn.InstanceNorm2d(64, affine=True)
        self.conv3 = ConvLayer(64, 128, kernel_size=3, stride=2)
        self.in3 = nn.InstanceNorm2d(128, affine=True)
        # Residual blocks
        self.res1 = ResidualBlock(128)
        self.res2 = ResidualBlock(128)
        self.res3 = ResidualBlock(128)
        self.res4 = ResidualBlock(128)
        self.res5 = ResidualBlock(128)
        # Decoder
        self.deconv1 = UpsampleConvLayer(128, 64, kernel_size=3, stride=1, upsample=2)
        self.in4 = nn.InstanceNorm2d(64, affine=True)
        self.deconv2 = UpsampleConvLayer(64, 32, kernel_size=3, stride=1, upsample=2)
        self.in5 = nn.InstanceNorm2d(32, affine=True)
        self.deconv3 = ConvLayer(32, 3, kernel_size=9, stride=1)
        # Non-linearity
        self.relu = nn.ReLU()

    def forward(self, x):
        y = self.relu(self.in1(self.conv1(x)))
        y = self.relu(self.in2(self.conv2(y)))
        y = self.relu(self.in3(self.conv3(y)))
        y = self.res1(y)
        y = self.res2(y)
        y = self.res3(y)
        y = self.res4(y)
        y = self.res5(y)
        y = self.relu(self.in4(self.deconv1(y)))
        y = self.relu(self.in5(self.deconv2(y)))
        y = self.deconv3(y)
        return y

print("Model architecture defined!")

Model architecture defined!


## 3. Download Pre-trained Model Weights

We'll download a pre-trained model (mosaic style as a test).

In [5]:
import urllib.request
import os

# Create models directory if it doesn't exist
models_dir = Path("../models")
models_dir.mkdir(exist_ok=True)

# Download a pre-trained model (mosaic style as example)
model_url = "https://www.dropbox.com/s/lrvwfehqdcxoza8/mosaic.pth?dl=1"
model_path = models_dir / "mosaic.pth"

if not model_path.exists():
    print(f"Downloading pre-trained model to {model_path}...")
    urllib.request.urlretrieve(model_url, model_path)
    print("Download complete!")
else:
    print(f"Model already exists at {model_path}")

# Load the model
model = TransformerNet()
model.load_state_dict(torch.load(model_path, map_location=device))
model = model.to(device)
model.eval()

print("Model loaded successfully!")

Model already exists at ../models/mosaic.pth


RuntimeError: Expected hasRecord("version") to be true, but got false.  (Could this error message be improved?  If so, please report an enhancement request to PyTorch.)

## 4. Test on Abbey Road Frame

Let's grab a test frame from your processed data.

In [None]:
# Function to load and preprocess image
def load_image(image_path, size=None):
    img = Image.open(image_path).convert('RGB')
    if size:
        img = img.resize((size, int(size * img.size[1] / img.size[0])), Image.LANCZOS)
    return img

def transform_image(img):
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    return transform(img).unsqueeze(0)

def denormalize_image(tensor):
    # Clamp to [0, 1] range
    tensor = tensor.squeeze(0).clamp(0, 1)
    return transforms.ToPILImage()(tensor.cpu())

# Try to find a test frame from your data
test_image_dir = Path("../../data/frames")
test_frames = list(test_image_dir.glob("**/*.jpg"))

if test_frames:
    test_image_path = test_frames[0]
    print(f"Using test frame: {test_image_path}")
else:
    print("No test frames found. Please add a frame to ../test_images/")
    test_image_path = None

# If we have a test image, process it
if test_image_path:
    # Load original image
    original_img = load_image(test_image_path)
    print(f"Original image size: {original_img.size}")
    
    # Prepare input tensor
    input_tensor = transform_image(original_img).to(device)
    
    # Benchmark inference time
    warmup_runs = 5
    test_runs = 20
    
    print("\nWarming up GPU...")
    with torch.no_grad():
        for _ in range(warmup_runs):
            _ = model(input_tensor)
    
    print(f"Running {test_runs} inference tests...")
    times = []
    with torch.no_grad():
        for _ in range(test_runs):
            start = time.time()
            output_tensor = model(input_tensor)
            if device.type == 'cuda':
                torch.cuda.synchronize()
            end = time.time()
            times.append((end - start) * 1000)  # Convert to ms
    
    avg_time = np.mean(times)
    std_time = np.std(times)
    
    print(f"\n{'='*60}")
    print(f"INFERENCE BENCHMARK RESULTS")
    print(f"{'='*60}")
    print(f"Average inference time: {avg_time:.2f}ms Â± {std_time:.2f}ms")
    print(f"Estimated time per 30-frame segment: {(avg_time * 30) / 1000:.2f}s")
    print(f"\nComparison:")
    print(f"  Current cv::stylization: ~180s per segment")
    print(f"  Neural style transfer:   ~{(avg_time * 30) / 1000:.2f}s per segment")
    print(f"  Speedup:                 ~{180 / ((avg_time * 30) / 1000):.1f}x faster!")
    print(f"{'='*60}\n")
    
    # Convert output to image
    output_img = denormalize_image(output_tensor)
    
    # Display results
    fig, axes = plt.subplots(1, 2, figsize=(15, 7))
    axes[0].imshow(original_img)
    axes[0].set_title('Original Frame', fontsize=14)
    axes[0].axis('off')
    
    axes[1].imshow(output_img)
    axes[1].set_title(f'Neural Style Transfer ({avg_time:.0f}ms)', fontsize=14)
    axes[1].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    # Save output
    output_dir = Path("../outputs")
    output_dir.mkdir(exist_ok=True)
    output_path = output_dir / "quicktest_output.jpg"
    output_img.save(output_path)
    print(f"Output saved to: {output_path}")

## 5. Next Steps

If the speed looks good (should be 50-200ms per frame), we'll:

1. **Train on Dali style** - Create a model trained on actual Dali paintings
2. **Export to ONNX** - For C++ integration
3. **Optimize with TensorRT** - For maximum GPU performance
4. **Integrate into fast_processor.cpp** - Replace cv::stylization

See notebook `02_train_dali_style.ipynb` for training instructions.

## 6. Export to ONNX (Optional - for C++ integration)

Once we're happy with the model, we can export it to ONNX format.

In [None]:
import torch.onnx

# Export model to ONNX
if test_image_path:
    onnx_path = models_dir / "style_transfer.onnx"
    
    # Create dummy input (1920x1080 like Abbey Road stream)
    dummy_input = torch.randn(1, 3, 1080, 1920).to(device)
    
    print(f"Exporting model to {onnx_path}...")
    torch.onnx.export(
        model,
        dummy_input,
        onnx_path,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={'input': {0: 'batch_size', 2: 'height', 3: 'width'},
                     'output': {0: 'batch_size', 2: 'height', 3: 'width'}}
    )
    
    print(f"ONNX model saved to {onnx_path}")
    print(f"\nNext: Optimize with TensorRT for maximum speed!")
    print(f"Command: trtexec --onnx={onnx_path} --saveEngine=style_transfer.trt --fp16")