<a href="https://colab.research.google.com/github/supunabeywickrama/my-colab-work/blob/main/qulity_upscaler.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# (Cell type: code)
# Mount Drive (optional)
from google.colab import drive
drive.mount('/content/drive')  # accept prompt

# Basic installs
!apt-get update -qq
!apt-get install -y -qq ffmpeg

# Python dependencies
# We'll use PyTorch (GPU), torchvision, imageio, pillow, tqdm
import sys
!pip install --quiet torch torchvision==0.24.1 --extra-index-url https://download.pytorch.org/whl/cu118
!pip install --quiet einops imageio tqdm opencv-python pillow


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.0/8.0 MB[0m [31m35.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m899.7/899.7 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m594.3/594.3 MB[0m [31m773.7 kB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/10.2 MB[0m [31m109.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.0/88.0 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m954.8/954.8 kB[0m [31m

In [3]:
# (Cell type: code)
import os
WORKDIR = "/content/vsr_project"
os.makedirs(WORKDIR, exist_ok=True)

# Paths
DRIVE_BASE = "/content/drive/MyDrive/vsr_project"   # change if needed
os.makedirs(DRIVE_BASE, exist_ok=True)

DATA_DIR = os.path.join(WORKDIR, "data")
HR_FRAMES = os.path.join(DATA_DIR, "hr_frames")
LR_FRAMES = os.path.join(DATA_DIR, "lr_frames")
MODEL_DIR = os.path.join(WORKDIR, "models")
OUTPUT_DIR = os.path.join(WORKDIR, "outputs")

for d in [DATA_DIR, HR_FRAMES, LR_FRAMES, MODEL_DIR, OUTPUT_DIR]:
    os.makedirs(d, exist_ok=True)

# Input video path (you'll upload this later or set to a Drive path)
INPUT_VIDEO = "/content/input.mp4"   # default, replace if using Drive file
SCALE = 4                            # upscale factor for the pipeline (x2 or x4)
BATCH_SIZE = 4                       # training batch size (temporal)
SEQ_LEN = 5                          # number of frames in a training sample (odd recommended)
DEVICE = "cuda" if (os.environ.get("COLAB_GPU") is not None or \
                    __import__("torch").cuda.is_available()) else "cpu"
print("Device:", DEVICE)


Device: cuda


In [4]:
# (Cell type: code)
# Use Colab upload widget to upload a short HR video (e.g., 3-10 seconds)
from google.colab import files
print("Upload a short HR video (mp4). It will be saved at /content/input.mp4")
uploaded = files.upload()
# If user uploaded file, set INPUT_VIDEO automatically
if uploaded:
    name = next(iter(uploaded.keys()))
    print("Uploaded:", name)
    # Move to INPUT_VIDEO
    import shutil
    shutil.move(name, INPUT_VIDEO)
print("INPUT_VIDEO path set to:", INPUT_VIDEO)


Upload a short HR video (mp4). It will be saved at /content/input.mp4


Saving input.mp4 to input.mp4
Uploaded: input.mp4
INPUT_VIDEO path set to: /content/input.mp4


In [5]:
# (Cell type: code)
import os, subprocess, shlex, glob
from pathlib import Path

# Clean existing frames
!rm -rf "{HR_FRAMES}/*" || true
!rm -rf "{LR_FRAMES}/*" || true

# Extract frames (PNG) and audio
def extract_frames_and_audio(video_path, frames_dir, audio_out):
    os.makedirs(frames_dir, exist_ok=True)
    cmd = f'ffmpeg -y -i "{video_path}" "{frames_dir}/frame_%06d.png"'
    print("Running:", cmd)
    subprocess.check_call(shlex.split(cmd))
    # extract audio
    cmd2 = f'ffmpeg -y -i "{video_path}" -vn -acodec copy "{audio_out}"'
    try:
        subprocess.check_call(shlex.split(cmd2))
    except Exception as e:
        print("Audio extraction failed (maybe no audio). Error:", e)

AUDIO_PATH = os.path.join(WORKDIR, "audio.aac")
extract_frames_and_audio(INPUT_VIDEO, HR_FRAMES, AUDIO_PATH)

num_hr = len(glob.glob(os.path.join(HR_FRAMES, "frame_*.png")))
print(f"Extracted {num_hr} HR frames to {HR_FRAMES}")


Running: ffmpeg -y -i "/content/input.mp4" "/content/vsr_project/data/hr_frames/frame_%06d.png"
Extracted 288 HR frames to /content/vsr_project/data/hr_frames


In [6]:
# (Cell type: code)
import random, numpy as np, cv2, os
from pathlib import Path
from PIL import Image, ImageFilter, ImageOps
import imageio

def degrade_image(hr_img, scale=SCALE, jpeg_q=None, add_noise=True, blur_sigma=None):
    # hr_img: numpy uint8 HxWxC
    img = hr_img.copy()
    if blur_sigma is None:
        blur_sigma = random.uniform(0.0, 1.6)  # mild motion / gaussian blur

    if blur_sigma > 0:
        img = cv2.GaussianBlur(img, (0,0), blur_sigma)

    # downsample
    h, w = img.shape[:2]
    hr_h, hr_w = h, w
    lr_h, lr_w = hr_h // scale, hr_w // scale
    img_lr = cv2.resize(img, (lr_w, lr_h), interpolation=cv2.INTER_AREA)

    # Add sensor-like noise
    if add_noise:
        noise_level = random.uniform(0, 8)  # std dev
        noise = np.random.normal(0, noise_level, img_lr.shape).astype(np.float32)
        img_lr = np.clip(img_lr.astype(np.float32) + noise, 0, 255).astype(np.uint8)

    # JPEG compression
    if jpeg_q is None:
        jpeg_q = random.randint(30, 95)
    # encode to jpeg and decode to simulate compression
    encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_q]
    _, encimg = cv2.imencode('.jpg', img_lr, encode_param)
    img_lr = cv2.imdecode(encimg, 1)

    return img_lr

# Run degradation on all HR frames and save LR frames
hr_paths = sorted(Path(HR_FRAMES).glob("frame_*.png"))
for i, p in enumerate(hr_paths):
    hr = cv2.imread(str(p))[:,:,::-1]  # BGR->RGB
    lr = degrade_image(hr, scale=SCALE)
    outp = os.path.join(LR_FRAMES, f"frame_{i:06d}.png")
    imageio.imsave(outp, lr)
print("Saved LR frames to", LR_FRAMES, "count:", len(list(Path(LR_FRAMES).glob("*.png"))))


Saved LR frames to /content/vsr_project/data/lr_frames count: 288


In [7]:
# (Cell type: code)
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, 3, 1, 1)
        self.conv2 = nn.Conv2d(channels, channels, 3, 1, 1)
        self.act = nn.ReLU(True)
    def forward(self, x):
        out = self.act(self.conv1(x))
        out = self.conv2(out)
        return x + 0.1 * out

class SmallVSR(nn.Module):
    def __init__(self, in_ch=3, feat=64, scale=4):
        super().__init__()
        self.scale = scale
        self.encoder = nn.Sequential(
            nn.Conv2d(in_ch, feat, 3, 1, 1),
            nn.ReLU(True),
            ResidualBlock(feat),
            ResidualBlock(feat)
        )
        # hidden state conv
        self.state_conv = nn.Conv2d(feat*2, feat, 3, 1, 1)
        # few residual blocks after fusion
        self.res_blocks = nn.Sequential(*[ResidualBlock(feat) for _ in range(4)])
        # upsampler
        self.upsample = nn.Sequential(
            nn.Conv2d(feat, feat * (scale//2)**2, 3, 1, 1),
            nn.PixelShuffle(scale//2),
            nn.ReLU(True),
            nn.Conv2d(feat, feat * 4, 3, 1, 1),
            nn.PixelShuffle(2),
            nn.Conv2d(feat, 3, 3, 1, 1)
        )
    def forward(self, frames):  # frames: tensor B, T, C, H, W  (LR)
        B, T, C, H, W = frames.shape
        device = frames.device
        # initialize hidden state
        h = torch.zeros(B, self.encoder[0].out_channels, H, W, device=device)
        outputs = []
        for t in range(T):
            x = frames[:, t]  # B,C,H,W
            feat = self.encoder(x)
            # concat hidden & feat
            fused = torch.cat([feat, h], dim=1)
            h = F.relu(self.state_conv(fused))
            h = self.res_blocks(h)
            out = self.upsample(h)
            outputs.append(out)
        # return B, T, C, H*scale, W*scale
        return torch.stack(outputs, dim=1)


In [8]:
# (Cell type: code)
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from pathlib import Path
import random
import numpy as np

class VideoFrameDataset(Dataset):
    def __init__(self, hr_dir, lr_dir, seq_len=SEQ_LEN, transform=None):
        self.hr_paths = sorted(list(Path(hr_dir).glob("frame_*.png")))
        self.lr_paths = sorted(list(Path(lr_dir).glob("frame_*.png")))
        assert len(self.hr_paths) == len(self.lr_paths), "HR/LR counts differ"
        self.N = len(self.hr_paths)
        self.seq_len = seq_len
        self.half = seq_len // 2
        self.transform = transform or (lambda x: x)
    def __len__(self):
        return max(0, self.N - self.seq_len + 1)
    def __getitem__(self, idx):
        # sample sequence idx..idx+seq_len-1
        hr_seq = []
        lr_seq = []
        for t in range(idx, idx + self.seq_len):
            hr = Image.open(self.hr_paths[t]).convert("RGB")
            lr = Image.open(self.lr_paths[t]).convert("RGB")
            hr_seq.append(self.transform(hr))
            lr_seq.append(self.transform(lr))
        # stack to tensors shape (T,C,H,W)
        hr = torch.stack(hr_seq, dim=0)
        lr = torch.stack(lr_seq, dim=0)
        # return lr (T,C,H,W), hr (T,C,H*scale,W*scale)
        return lr, hr

# transforms: to tensor and normalize [0,1]
import torchvision.transforms.functional as Fv
def pil_to_tensor(img):
    arr = np.array(img).astype(np.float32) / 255.0
    # HWC -> CHW
    arr = np.transpose(arr, (2,0,1))
    return torch.from_numpy(arr)

def transform_fn(pil_img):
    return pil_to_tensor(pil_img)

# Create dataset and dataloader
dataset = VideoFrameDataset(HR_FRAMES, LR_FRAMES, seq_len=SEQ_LEN, transform=transform_fn)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
print("Dataset length (sequences):", len(dataset))


Dataset length (sequences): 284


In [9]:
# (Cell type: code)
import torch.optim as optim
from tqdm import tqdm
import math

# Instantiate model
model = SmallVSR(in_ch=3, feat=64, scale=SCALE).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=2e-4)
criterion = nn.L1Loss()

# basic training loop (few epochs for demo)
EPOCHS = 6
save_every = 2

for epoch in range(1, EPOCHS+1):
    model.train()
    running_loss = 0.0
    pbar = tqdm(dataloader, desc=f"Epoch {epoch}/{EPOCHS}")
    for batch_idx, (lr_seq, hr_seq) in enumerate(pbar):
        # lr_seq: B, T, C, H, W ; hr_seq: B, T, C, H*scale, W*scale
        # move to device
        lr_seq = lr_seq.to(DEVICE)
        hr_seq = hr_seq.to(DEVICE)
        # forward
        out_seq = model(lr_seq)  # B, T, C, H*scale, W*scale
        # compute loss only on central frame to reduce memory/training time (you can expand)
        mid = out_seq.shape[1] // 2
        loss = criterion(out_seq[:, mid], hr_seq[:, mid])
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        pbar.set_postfix(loss=running_loss / (batch_idx + 1))
    # save checkpoint
    if epoch % save_every == 0 or epoch == EPOCHS:
        torch.save(model.state_dict(), os.path.join(MODEL_DIR, f"smallvsr_epoch{epoch}.pth"))
        print("Saved checkpoint:", os.path.join(MODEL_DIR, f"smallvsr_epoch{epoch}.pth"))
print("Training finished.")


Epoch 1/6: 100%|██████████| 71/71 [00:38<00:00,  1.86it/s, loss=0.103]
Epoch 2/6: 100%|██████████| 71/71 [00:38<00:00,  1.84it/s, loss=0.0551]


Saved checkpoint: /content/vsr_project/models/smallvsr_epoch2.pth


Epoch 3/6: 100%|██████████| 71/71 [00:40<00:00,  1.76it/s, loss=0.0476]
Epoch 4/6: 100%|██████████| 71/71 [00:40<00:00,  1.77it/s, loss=0.0458]


Saved checkpoint: /content/vsr_project/models/smallvsr_epoch4.pth


Epoch 5/6: 100%|██████████| 71/71 [00:40<00:00,  1.76it/s, loss=0.0449]
Epoch 6/6: 100%|██████████| 71/71 [00:40<00:00,  1.76it/s, loss=0.0443]

Saved checkpoint: /content/vsr_project/models/smallvsr_epoch6.pth
Training finished.





In [10]:
# (Cell type: code)
import glob, imageio
from tqdm import tqdm

# Load latest checkpoint
ckpts = sorted(glob.glob(os.path.join(MODEL_DIR, "smallvsr_epoch*.pth")))
assert ckpts, "No checkpoints found. Train first."
ckpt = ckpts[-1]
model.load_state_dict(torch.load(ckpt, map_location=DEVICE))
model.to(DEVICE).eval()
print("Loaded checkpoint:", ckpt)

# Read LR frames sorted
lr_paths = sorted(Path(LR_FRAMES).glob("frame_*.png"))
N = len(lr_paths)

# We'll run sliding windows to produce output frames for all central frames.
pad = SEQ_LEN // 2
# For boundary frames we repeat edge frames (simple padding)
def load_img_tensor(path):
    im = Image.open(path).convert("RGB")
    return pil_to_tensor(im)

# create padded list of tensors
tensors = [load_img_tensor(p) for p in lr_paths]
# pad front/back
for _ in range(pad):
    tensors.insert(0, tensors[0])
    tensors.append(tensors[-1])

# run sliding windows
os.makedirs(os.path.join(OUTPUT_DIR, "up_frames"), exist_ok=True)
out_paths = []
with torch.no_grad():
    for i in tqdm(range(len(lr_paths))):
        seq = tensors[i:i+SEQ_LEN]  # list of T tensors (C,H,W)
        seq_t = torch.stack(seq, dim=0).unsqueeze(0).to(DEVICE)  # 1,T,C,H,W
        out = model(seq_t)  # 1,T,C,H*s,W*s
        mid = out.shape[1] // 2
        frame = out[0, mid].cpu().clamp(0,1).numpy()  # C,H,W
        frame = (frame * 255.0).transpose(1,2,0).astype('uint8')
        out_path = os.path.join(OUTPUT_DIR, "up_frames", f"frame_{i:06d}.png")
        imageio.imsave(out_path, frame)
        out_paths.append(out_path)
print("Upscaled frames saved to", os.path.join(OUTPUT_DIR, "up_frames"))


Loaded checkpoint: /content/vsr_project/models/smallvsr_epoch6.pth


100%|██████████| 288/288 [00:51<00:00,  5.60it/s]

Upscaled frames saved to /content/vsr_project/outputs/up_frames





In [12]:
# Replace previous final compose step with this robust version
import subprocess, shlex, json, os

INPUT_VIDEO = "/content/input.mp4"
UPVIDEO = "/content/vsr_project/outputs/upscaled_only.mp4"
OUT_SIDE_BY_SIDE = "/content/vsr_project/outputs/side_by_side_output.mp4"

def ffprobe_stream_info(path):
    # returns dict with width,height and whether audio stream exists
    probe_w = None
    probe_h = None
    has_audio = False
    try:
        # video widthxheight
        cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=p=0:s=x "{path}"'
        out = subprocess.check_output(shlex.split(cmd)).decode().strip()
        if out:
            parts = out.split('x')
            if len(parts) == 2:
                probe_w = int(parts[0]); probe_h = int(parts[1])
        # check audio streams
        cmd2 = f'ffprobe -v error -select_streams a -show_entries stream=index -of csv=p=0 "{path}"'
        out2 = subprocess.check_output(shlex.split(cmd2)).decode().strip()
        has_audio = bool(out2)
    except subprocess.CalledProcessError as e:
        print("ffprobe error for", path, " — ", e)
    return {"width": probe_w, "height": probe_h, "has_audio": has_audio}

info_input = ffprobe_stream_info(INPUT_VIDEO)
info_up = ffprobe_stream_info(UPVIDEO)

print("Input video info:", info_input)
print("Upscaled video info:", info_up)

if info_up["height"] is None:
    raise RuntimeError(f"Could not determine height of upscaled video: {UPVIDEO}")

# Build filter: scale input to match upscaled height exactly.
# use -2 for width to keep aspect ratio while forcing even width (ffmpeg requirement for many codecs)
scale_to = info_up["height"]
filter_complex = f"[0:v]scale=-2:{scale_to}[left];[left][1:v]hstack=inputs=2[v]"

# Build ffmpeg command
cmd = [
    "ffmpeg", "-y",
    "-i", INPUT_VIDEO,
    "-i", UPVIDEO,
    "-filter_complex", filter_complex,
    "-map", "[v]"
]

# Only map audio if input has audio
if info_input["has_audio"]:
    cmd += ["-map", "0:a?", "-c:a", "copy"]
# Video codec options
cmd += ["-c:v", "libx264", "-crf", "18", "-preset", "medium", OUT_SIDE_BY_SIDE]

print("Running ffmpeg command:")
print(" ".join(shlex.quote(x) for x in cmd))

# Run and capture output
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
    print("ffmpeg failed with return code", proc.returncode)
    print("----- ffmpeg stderr -----")
    print(proc.stderr)
    print("----- ffmpeg stdout -----")
    print(proc.stdout)
    raise RuntimeError("ffmpeg failed — see stderr above.")
else:
    print("Success — side-by-side saved to:", OUT_SIDE_BY_SIDE)
    # show file size
    if os.path.exists(OUT_SIDE_BY_SIDE):
        sz_mb = os.path.getsize(OUT_SIDE_BY_SIDE) / (1024*1024)
        print(f"Output size: {sz_mb:.2f} MB")


Input video info: {'width': 640, 'height': 320, 'has_audio': True}
Upscaled video info: {'width': 640, 'height': 320, 'has_audio': False}
Running ffmpeg command:
ffmpeg -y -i /content/input.mp4 -i /content/vsr_project/outputs/upscaled_only.mp4 -filter_complex '[0:v]scale=-2:320[left];[left][1:v]hstack=inputs=2[v]' -map '[v]' -map '0:a?' -c:a copy -c:v libx264 -crf 18 -preset medium /content/vsr_project/outputs/side_by_side_output.mp4
Success — side-by-side saved to: /content/vsr_project/outputs/side_by_side_output.mp4
Output size: 2.57 MB


In [18]:
# Add labeled side-by-side: "Original" (left) and "Upscaled" (right)
import subprocess, shlex, json, os

INPUT_VIDEO = "/content/input.mp4"
UPVIDEO = "/content/vsr_project/outputs/upscaled_only.mp4"
OUT_SIDE_BY_SIDE = "/content/vsr_project/outputs/side_by_side_labeled.mp4"

# font path on Colab / Debian (DejaVu is available)
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
if not os.path.exists(FONT_PATH):
    FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"  # fallback

def ffprobe_stream_info(path):
    probe_w = None
    probe_h = None
    has_audio = False
    try:
        cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=p=0:s=x "{path}"'
        out = subprocess.check_output(shlex.split(cmd)).decode().strip()
        if out:
            parts = out.split('x')
            if len(parts) == 2:
                probe_w = int(parts[0]); probe_h = int(parts[1])
        cmd2 = f'ffprobe -v error -select_streams a -show_entries stream=index -of csv=p=0 "{path}"'
        out2 = subprocess.check_output(shlex.split(cmd2)).decode().strip()
        has_audio = bool(out2)
    except subprocess.CalledProcessError as e:
        print("ffprobe error for", path, " — ", e)
    return {"width": probe_w, "height": probe_h, "has_audio": has_audio}

info_input = ffprobe_stream_info(INPUT_VIDEO)
info_up = ffprobe_stream_info(UPVIDEO)

print("Input video info:", info_input)
print("Upscaled video info:", info_up)

if info_up["height"] is None:
    raise RuntimeError(f"Could not determine height of upscaled video: {UPVIDEO}")

scale_to = info_up["height"]

# Drawtext parameters (adjust fontsize as needed)
label_fontsize = max(24, scale_to // 32)  # heuristic fontsize
box_opacity = 0.6
box_color = "black@" + str(box_opacity)
fontfile_esc = FONT_PATH.replace(":", "\\:")  # escape if any colon

# filter: scale input to match height, add label; add label to upscaled; then hstack
# Use x=10 y=10 so text is in top-left with a semi-opaque box
left_draw = (
    f"scale=-2:{scale_to},"
    f"drawtext=fontfile='{fontfile_esc}':text='Original':x=10:y=10:fontsize={label_fontsize}:"
    f"fontcolor=white:box=1:boxcolor={box_color}:boxborderw=12"
)
right_draw = (
    f"drawtext=fontfile='{fontfile_esc}':text='Upscaled':x=10:y=10:fontsize={label_fontsize}:"
    f"fontcolor=white:box=1:boxcolor={box_color}:boxborderw=12"
)

filter_complex = f"[0:v]{left_draw}[left];[1:v]{right_draw}[right];[left][right]hstack=inputs=2[v]"

cmd = [
    "ffmpeg", "-y",
    "-i", INPUT_VIDEO,
    "-i", UPVIDEO,
    "-filter_complex", filter_complex,
    "-map", "[v]"
]

# Only map audio if input has audio
if info_input["has_audio"]:
    cmd += ["-map", "0:a?", "-c:a", "copy"]

# Video codec options
cmd += ["-c:v", "libx264", "-crf", "18", "-preset", "medium", OUT_SIDE_BY_SIDE]

print("Running ffmpeg command:")
print(" ".join(shlex.quote(x) for x in cmd))

proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
    print("ffmpeg failed with return code", proc.returncode)
    print("----- ffmpeg stderr -----")
    print(proc.stderr)
    print("----- ffmpeg stdout -----")
    print(proc.stdout)
    raise RuntimeError("ffmpeg failed — see stderr above.")
else:
    print("Success — labeled side-by-side saved to:", OUT_SIDE_BY_SIDE)
    if os.path.exists(OUT_SIDE_BY_SIDE):
        sz_mb = os.path.getsize(OUT_SIDE_BY_SIDE) / (1024*1024)
        print(f"Output size: {sz_mb:.2f} MB")


Input video info: {'width': 640, 'height': 320, 'has_audio': True}
Upscaled video info: {'width': 640, 'height': 320, 'has_audio': False}
Running ffmpeg command:
ffmpeg -y -i /content/input.mp4 -i /content/vsr_project/outputs/upscaled_only.mp4 -filter_complex '[0:v]scale=-2:320,drawtext=fontfile='"'"'/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf'"'"':text='"'"'Original'"'"':x=10:y=10:fontsize=24:fontcolor=white:box=1:boxcolor=black@0.6:boxborderw=12[left];[1:v]drawtext=fontfile='"'"'/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf'"'"':text='"'"'Upscaled'"'"':x=10:y=10:fontsize=24:fontcolor=white:box=1:boxcolor=black@0.6:boxborderw=12[right];[left][right]hstack=inputs=2[v]' -map '[v]' -map '0:a?' -c:a copy -c:v libx264 -crf 18 -preset medium /content/vsr_project/outputs/side_by_side_labeled.mp4
Success — labeled side-by-side saved to: /content/vsr_project/outputs/side_by_side_labeled.mp4
Output size: 2.58 MB


In [15]:
# (Cell type: code)
# Clone Real-ESRGAN
%cd /content
!git clone https://github.com/xinntao/Real-ESRGAN.git
%cd Real-ESRGAN
!pip install -r requirements.txt

# Create weights dir
%cd /content/Real-ESRGAN
!mkdir -p weights

# Upload Real-ESRGAN weights using Colab file upload (if you have them locally),
# or copy from Drive: e.g., !cp /content/drive/MyDrive/weights/RealESRGAN_x4plus.pth ./weights/
from google.colab import files
print("If you have a weights file locally, upload it now (RealESRGAN_x4plus.pth recommended).")
# files.upload()  # uncomment to use interactive upload

# Run inference (example for x4plus)
%cd /content/Real-ESRGAN
# Replace MODEL_NAME with the model you have. This command takes frames_dir and outputs to specified dir.
MODEL_NAME = "RealESRGAN_x4plus"
INPUT_FRAMES_DIR = LR_FRAMES
OUTPUT_FRAMES_DIR = os.path.join(OUTPUT_DIR, "realesrgan_up")
!python inference_realesrgan.py -n {MODEL_NAME} -i "{INPUT_FRAMES_DIR}" -o "{OUTPUT_FRAMES_DIR}" --suffix ""


/content
fatal: destination path 'Real-ESRGAN' already exists and is not an empty directory.
/content/Real-ESRGAN
/content/Real-ESRGAN
If you have a weights file locally, upload it now (RealESRGAN_x4plus.pth recommended).
/content/Real-ESRGAN
Traceback (most recent call last):
  File "/content/Real-ESRGAN/inference_realesrgan.py", line 5, in <module>
    from basicsr.archs.rrdbnet_arch import RRDBNet
  File "/usr/local/lib/python3.12/dist-packages/basicsr/__init__.py", line 4, in <module>
    from .data import *
  File "/usr/local/lib/python3.12/dist-packages/basicsr/data/__init__.py", line 22, in <module>
    _dataset_modules = [importlib.import_module(f'basicsr.data.{file_name}') for file_name in dataset_filenames]
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^