# NVENC Encode Worker - FFmpeg GPU on Colab T4

This notebook runs on Google Colab with a T4 GPU to offload video encoding jobs.

**How it works:**
1. Mounts Google Drive
2. Watches a job directory for incoming encode requests (`request.json`)
3. Processes transcode/mux/concat operations with FFmpeg `h264_nvenc`
4. Writes outputs and `done.marker` for local dispatcher pickup

**Setup:** Runtime -> Change runtime type -> T4 GPU

## 1. Install Dependencies

In [None]:
import shutil
import subprocess


def _run(cmd: list[str], check: bool = False) -> subprocess.CompletedProcess:
    result = subprocess.run(cmd, capture_output=True, text=True)
    if check and result.returncode != 0:
        raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{result.stderr}")
    return result


def has_h264_nvenc() -> bool:
    ffmpeg = shutil.which('ffmpeg')
    if not ffmpeg:
        return False
    result = _run(['bash', '-lc', 'ffmpeg -hide_banner -encoders | grep h264_nvenc'])
    return result.returncode == 0


print('Checking ffmpeg + NVENC availability...')
_run(['bash', '-lc', 'apt-get -qq update'], check=False)

if not shutil.which('ffmpeg'):
    print('ffmpeg not found, installing...')
    _run(['bash', '-lc', 'apt-get -qq install -y ffmpeg'], check=True)

if not has_h264_nvenc():
    print('h264_nvenc not found in current ffmpeg. Trying CUDA-related packages...')
    _run(['bash', '-lc', 'apt-get -qq install -y nvidia-cuda-toolkit'], check=False)
    # Re-install ffmpeg in case apt resolves a different build.
    _run(['bash', '-lc', 'apt-get -qq install -y ffmpeg'], check=False)

if has_h264_nvenc():
    print('NVENC-enabled ffmpeg is available.')
else:
    print('WARNING: NVENC encoder still unavailable in this runtime ffmpeg build.')
    print('  Try a known CUDA-enabled ffmpeg binary or source build if needed.')

print('ffmpeg version:')
print(_run(['bash', '-lc', 'ffmpeg -version | head -n 1']).stdout.strip())

## 2. Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## 3. Configure Paths

In [None]:
import os

DRIVE_BASE = "/content/drive/MyDrive/autonomous-recording/encode-jobs"
os.makedirs(DRIVE_BASE, exist_ok=True)
print(f"Job directory: {DRIVE_BASE}")
print(f"Existing jobs: {os.listdir(DRIVE_BASE) if os.path.exists(DRIVE_BASE) else 'none'}")

## 4. Verify NVENC

In [None]:
import subprocess

print('GPU info:')
print(subprocess.run(['nvidia-smi'], capture_output=True, text=True).stdout[:1500])

encoders_out = subprocess.run(['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True).stdout
nvenc_lines = [line.strip() for line in encoders_out.splitlines() if 'nvenc' in line.lower()]
print(f"NVENC encoders found: {len(nvenc_lines)}")
for line in nvenc_lines:
    print(f"  {line}")

if not any('h264_nvenc' in line for line in nvenc_lines):
    print('WARNING: h264_nvenc not available. Encoding jobs may fail until ffmpeg supports NVENC.')

## 5. Encode Job Processor

In [None]:
import json
import tempfile
import time
from pathlib import Path


def _run_ffmpeg(cmd: list[str]) -> None:
    print('FFmpeg command:')
    print('  ' + ' '.join(cmd))
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    assert proc.stdout is not None
    for line in proc.stdout:
        line = line.rstrip()
        if line:
            print(line)
    code = proc.wait()
    if code != 0:
        raise RuntimeError(f'ffmpeg failed with exit code {code}')


def _nvenc_video_args(fmt: dict) -> list[str]:
    codec = str(fmt.get('codec', 'h264_nvenc'))
    preset = str(fmt.get('preset', 'p7'))
    cq = int(fmt.get('cq', 20))
    vf_parts = []

    width = fmt.get('width')
    height = fmt.get('height')
    if width and height:
        vf_parts.append(f"scale={int(width)}:{int(height)}")

    user_filter = str(fmt.get('video_filter', '')).strip()
    if user_filter:
        vf_parts.append(user_filter)

    args = [
        '-c:v', codec,
        '-preset', preset,
        '-rc', 'vbr',
        '-cq', str(cq),
        '-b:v', '0',
    ]
    if vf_parts:
        args += ['-vf', ','.join(vf_parts)]
    return args


def _audio_args(fmt: dict) -> list[str]:
    return [
        '-c:a', str(fmt.get('audio_codec', 'aac')),
        '-b:a', str(fmt.get('audio_bitrate', '192k')),
    ]


def process_encode_job(job_dir: str) -> dict:
    """Process a single encode job directory on Google Drive."""
    job_path = Path(job_dir)
    request_path = job_path / 'request.json'
    done_marker = job_path / 'done.marker'
    error_marker = job_path / 'error.marker'

    if done_marker.exists():
        return {'status': 'already_done', 'job_dir': str(job_path)}

    if not request_path.exists():
        return {'status': 'no_request', 'job_dir': str(job_path)}

    started = time.time()
    request = json.loads(request_path.read_text(encoding='utf-8'))
    input_files = [str(x) for x in request.get('input_files', [])]
    operations = request.get('operations', [])
    output_format = request.get('output_format') or {}

    fmt = {
        'codec': 'h264_nvenc',
        'preset': 'p7',
        'cq': 20,
        'audio_codec': 'aac',
        'audio_bitrate': '192k',
        'video_filter': 'fps=30,format=yuv420p',
        'width': 1920,
        'height': 1080,
    }
    fmt.update(output_format)

    print(f"\n{'=' * 60}")
    print(f"Processing encode job: {job_path.name}")
    print(f"Declared inputs: {len(input_files)} | Operations: {len(operations)}")
    print(f"{'=' * 60}")

    try:
        for name in input_files:
            p = job_path / name
            if not p.exists():
                raise FileNotFoundError(f"Missing input file from request.input_files: {name}")

        for idx, op in enumerate(operations, 1):
            op_type = str(op.get('type', '')).strip()
            print(f"\n[{idx}/{len(operations)}] Operation: {op_type}")

            if op_type == 'transcode':
                inp = job_path / str(op['input'])
                out = job_path / str(op['output'])
                if not inp.exists():
                    raise FileNotFoundError(f"Transcode input missing: {inp.name}")
                out.parent.mkdir(parents=True, exist_ok=True)

                cmd = [
                    'ffmpeg', '-y', '-hide_banner', '-hwaccel', 'cuda', '-i', str(inp),
                    *_nvenc_video_args(fmt),
                    *_audio_args(fmt),
                    '-movflags', '+faststart',
                    str(out),
                ]
                _run_ffmpeg(cmd)

            elif op_type == 'mux_audio':
                video = job_path / str(op['video'])
                audio = job_path / str(op['audio'])
                out = job_path / str(op['output'])
                if not video.exists():
                    raise FileNotFoundError(f"Mux video missing: {video.name}")
                if not audio.exists():
                    raise FileNotFoundError(f"Mux audio missing: {audio.name}")
                out.parent.mkdir(parents=True, exist_ok=True)

                audio_delay_ms = int(op.get('audio_delay_ms', 0))
                if audio_delay_ms > 0:
                    delay = f"adelay={audio_delay_ms}|{audio_delay_ms}"
                    map_audio = '[aout]'
                    filter_args = ['-filter_complex', f"[1:a]{delay}[aout]", '-map', '0:v:0', '-map', map_audio]
                else:
                    filter_args = ['-map', '0:v:0', '-map', '1:a:0']

                cmd = [
                    'ffmpeg', '-y', '-hide_banner',
                    '-hwaccel', 'cuda', '-i', str(video),
                    '-i', str(audio),
                    *filter_args,
                    *_nvenc_video_args(fmt),
                    *_audio_args(fmt),
                    '-shortest',
                    '-movflags', '+faststart',
                    str(out),
                ]
                _run_ffmpeg(cmd)

            elif op_type == 'concat':
                inputs = [job_path / str(x) for x in op.get('inputs', [])]
                out = job_path / str(op['output'])
                for p in inputs:
                    if not p.exists():
                        raise FileNotFoundError(f"Concat input missing: {p.name}")
                if not inputs:
                    raise ValueError('Concat operation requires at least one input')
                out.parent.mkdir(parents=True, exist_ok=True)

                with tempfile.NamedTemporaryFile('w', suffix='.txt', delete=False) as fp:
                    concat_list = Path(fp.name)
                    for p in inputs:
                        fp.write(f"file '{p}'\n")

                cmd = [
                    'ffmpeg', '-y', '-hide_banner',
                    '-hwaccel', 'cuda',
                    '-f', 'concat', '-safe', '0', '-i', str(concat_list),
                    *_nvenc_video_args(fmt),
                ]

                if bool(op.get('loudnorm', False)):
                    cmd += ['-af', 'loudnorm=I=-16:LRA=11:TP=-1.5']

                cmd += [*_audio_args(fmt), '-movflags', '+faststart', str(out)]
                try:
                    _run_ffmpeg(cmd)
                finally:
                    if concat_list.exists():
                        concat_list.unlink()

            else:
                raise ValueError(f"Unsupported operation type: {op_type}")

        elapsed = time.time() - started
        output_names = sorted({str(op.get('output')) for op in operations if op.get('output')})
        outputs = {}
        total_bytes = 0
        for name in output_names:
            path = job_path / name
            if path.exists():
                size = path.stat().st_size
                outputs[name] = {'bytes': size}
                total_bytes += size

        completion = {
            'status': 'completed',
            'elapsed_sec': elapsed,
            'operation_count': len(operations),
            'outputs': outputs,
            'total_output_bytes': total_bytes,
            'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
        }
        done_marker.write_text(json.dumps(completion, indent=2), encoding='utf-8')

        print(f"\nJob complete in {elapsed:.2f}s | outputs={len(outputs)} | total={total_bytes / (1024 * 1024):.2f} MB")
        return completion

    except Exception as exc:
        err = {
            'status': 'error',
            'error': str(exc),
            'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
        }
        error_marker.write_text(json.dumps(err, indent=2), encoding='utf-8')
        print(f"\nJob failed: {exc}")
        return err

## 6. Job Watcher Loop

In [None]:
import datetime

POLL_INTERVAL = 5


def watch_for_jobs():
    """Watch the job directory for new encode requests."""
    print(f"Watching for jobs in: {DRIVE_BASE}")
    print(f"   Poll interval: {POLL_INTERVAL}s")
    print('   Press stop to interrupt\n')

    processed = set()
    if os.path.exists(DRIVE_BASE):
        for name in os.listdir(DRIVE_BASE):
            job_dir = os.path.join(DRIVE_BASE, name)
            if os.path.isdir(job_dir):
                done = os.path.join(job_dir, 'done.marker')
                err = os.path.join(job_dir, 'error.marker')
                if os.path.exists(done) or os.path.exists(err):
                    processed.add(name)

    print(f"   Skipping {len(processed)} already-processed job(s)")

    while True:
        try:
            if not os.path.exists(DRIVE_BASE):
                time.sleep(POLL_INTERVAL)
                continue

            for name in sorted(os.listdir(DRIVE_BASE)):
                if name in processed:
                    continue

                job_dir = os.path.join(DRIVE_BASE, name)
                if not os.path.isdir(job_dir):
                    continue

                request_path = os.path.join(job_dir, 'request.json')
                if not os.path.exists(request_path):
                    continue

                now = datetime.datetime.now().strftime('%H:%M:%S')
                print(f"\n[{now}] New job detected: {name}")
                result = process_encode_job(job_dir)
                processed.add(name)

                now = datetime.datetime.now().strftime('%H:%M:%S')
                print(f"[{now}] Job {name} -> {result.get('status', 'unknown')}")

            time.sleep(POLL_INTERVAL)

        except KeyboardInterrupt:
            print('\n\nWatcher stopped.')
            break


watch_for_jobs()

## 7. Quick Test

In [None]:
import pathlib

test_dir = pathlib.Path('/tmp/nvenc-test')
test_dir.mkdir(parents=True, exist_ok=True)
src = test_dir / 'bars-input.webm'
dst = test_dir / 'bars-output.mp4'

create_cmd = [
    'ffmpeg', '-y', '-hide_banner',
    '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:rate=30',
    '-f', 'lavfi', '-i', 'sine=frequency=1000:sample_rate=48000',
    '-t', '10', '-c:v', 'libvpx-vp9', '-c:a', 'libopus', str(src),
]
_run_ffmpeg(create_cmd)

before = src.stat().st_size

encode_cmd = [
    'ffmpeg', '-y', '-hide_banner', '-hwaccel', 'cuda', '-i', str(src),
    '-c:v', 'h264_nvenc', '-preset', 'p7', '-rc', 'vbr', '-cq', '20', '-b:v', '0',
    '-vf', 'fps=30,format=yuv420p',
    '-c:a', 'aac', '-b:a', '192k',
    '-movflags', '+faststart', str(dst),
]

t0 = time.time()
_run_ffmpeg(encode_cmd)
elapsed = time.time() - t0
after = dst.stat().st_size

print(f'Input size:  {before / (1024 * 1024):.2f} MB')
print(f'Output size: {after / (1024 * 1024):.2f} MB')
print(f'Encode time: {elapsed:.2f}s')
print(f'Speed:       {10.0 / elapsed:.2f}x realtime')