# ColaStream - WebRTC SFU on Google Colab

Run a WebRTC SFU server on Colab for AI-powered live streaming.

**Features:**
- WHIP/WHEP WebRTC streaming
- Works behind Colab's NAT via TURN relay
- Publish from OBS, browser, or any WHIP client
- Process video frames with AI in real-time

---

## 1. Setup

Install MediaMTX and required dependencies.

In [None]:
#@title Install MediaMTX { display-mode: "form" }
#@markdown Downloads and extracts MediaMTX SFU server

import os
import subprocess

MEDIAMTX_VERSION = "v1.9.3"
MEDIAMTX_URL = f"https://github.com/bluenviron/mediamtx/releases/download/{MEDIAMTX_VERSION}/mediamtx_{MEDIAMTX_VERSION}_linux_amd64.tar.gz"

# Download and extract
!wget -q {MEDIAMTX_URL} -O mediamtx.tar.gz
!tar -xzf mediamtx.tar.gz
!chmod +x mediamtx
!rm mediamtx.tar.gz

print(f"MediaMTX {MEDIAMTX_VERSION} installed!")
!./mediamtx --version

In [None]:
#@title Install Cloudflared (No account needed) { display-mode: "form" }
#@markdown Downloads cloudflared for tunneling - no signup required!

!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -O cloudflared
!chmod +x cloudflared
print("cloudflared installed!")

## 2. Configure MediaMTX

Create a configuration optimized for Colab's network environment.

In [None]:
#@title Configure MediaMTX for Colab { display-mode: "form" }
#@markdown Automatically fetches TURN servers from VDO.Ninja

import requests
import json

#@markdown ---
#@markdown **Stream Settings**
default_stream_name = "live" #@param {type:"string"}

# Fetch TURN servers from VDO.Ninja
print("Fetching TURN servers from VDO.Ninja...")
try:
    r = requests.get("https://turnservers.vdo.ninja/", timeout=10)
    turn_data = r.json()
    turn_servers = turn_data.get("servers", [])
    print(f"Got {len(turn_servers)} TURN servers!")
except Exception as e:
    print(f"Warning: Could not fetch TURN servers: {e}")
    print("Using fallback servers...")
    turn_servers = [
        {"urls": ["turn:turn-use1.vdo.ninja:3478"], "username": "vdoninja", "credential": "EastSideRepresentZ"},
        {"urls": ["turn:turn-use2.vdo.ninja:3478"], "username": "vdoninja", "credential": "pleaseUseYourOwn"},
    ]

# Build ICE servers YAML
ice_servers_yaml = "  - url: stun:stun.l.google.com:19302\n"
for server in turn_servers:
    urls = server.get("urls", [])
    username = server.get("username", "")
    credential = server.get("credential", "")
    for url in urls:
        ice_servers_yaml += f"  - url: {url}\n"
        ice_servers_yaml += f"    username: {username}\n"
        ice_servers_yaml += f"    password: {credential}\n"

config = f"""
###############################################
# MediaMTX Configuration for Google Colab
# TURN servers from: https://turnservers.vdo.ninja/
###############################################

# Logging
logLevel: info
logDestinations: [stdout]

# API (for status checks)
api: yes
apiAddress: 127.0.0.1:9997

# RTSP Server (internal use)
rtsp: yes
rtspAddress: :8554
protocols: [tcp]  # TCP only for Colab compatibility

# HLS Server (fallback playback)
hls: yes
hlsAddress: :8888
hlsAlwaysRemux: yes

# WebRTC Server
webrtc: yes
webrtcAddress: :8889

# ICE Configuration - Critical for Colab!
# VDO.Ninja TURN servers for reliable connectivity
webrtcICEServers2:
{ice_servers_yaml}
# Force TCP candidates only (UDP usually blocked on Colab)
webrtcICEHostNAT1To1IPs: []
webrtcICEUDPMuxAddress:
webrtcICETCPMuxAddress: :8189

# Path configuration
paths:
  all_others:
"""

with open("mediamtx.yml", "w") as f:
    f.write(config)

print("\nConfiguration saved to mediamtx.yml")
print(f"Default stream path: /{default_stream_name}")
print("\nTURN servers configured:")
for server in turn_servers[:3]:  # Show first 3
    print(f"  - {server.get('urls', ['?'])[0]}")

## 3. Start the Server

Launch MediaMTX and create a public tunnel.

In [None]:
#@title Start MediaMTX Server { display-mode: "form" }
#@markdown Starts MediaMTX in the background

import subprocess
import time

# Kill any existing instance
!pkill -f mediamtx 2>/dev/null || true
time.sleep(1)

# Start MediaMTX in background
process = subprocess.Popen(
    ["./mediamtx", "mediamtx.yml"],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT
)

time.sleep(3)

# Check if running
if process.poll() is None:
    print("MediaMTX is running!")
    print("\nLocal endpoints:")
    print("  - WebRTC (WHIP/WHEP): http://localhost:8889")
    print("  - HLS: http://localhost:8888")
    print("  - RTSP: rtsp://localhost:8554")
    print("  - API: http://localhost:9997")
else:
    print("Failed to start MediaMTX!")
    print(process.stdout.read().decode())

In [None]:
#@title Alternative: ngrok Tunnel (requires free account) { display-mode: "form" }
#@markdown Use this if cloudflared isn't working. Requires ngrok signup.

#@markdown Get your free authtoken at: https://dashboard.ngrok.com/get-started/your-authtoken
ngrok_authtoken = "" #@param {type:"string"}
stream_name = "live" #@param {type:"string"}

if not ngrok_authtoken:
    print("ERROR: ngrok now requires authentication.")
    print("1. Sign up free at: https://dashboard.ngrok.com/signup")
    print("2. Get your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken")
    print("3. Paste it above and run this cell again")
    print("\nOr use the Cloudflared tunnel instead (no account needed)!")
else:
    !pip install -q pyngrok
    from pyngrok import ngrok
    from urllib.parse import quote
    
    ngrok.set_auth_token(ngrok_authtoken)
    ngrok.kill()
    
    import time
    time.sleep(1)
    
    tunnel = ngrok.connect(8889, "http")
    PUBLIC_URL = tunnel.public_url.replace("http://", "https://")
    
    GITHUB_PAGES_URL = "https://steveseguin.github.io/colastream"
    publish_url = f"{GITHUB_PAGES_URL}/?server={quote(PUBLIC_URL)}&stream={quote(stream_name)}"
    watch_url = f"{GITHUB_PAGES_URL}/watch.html?server={quote(PUBLIC_URL)}&stream={quote(stream_name)}"
    
    print("=" * 70)
    print("NGROK TUNNEL ACTIVE!")
    print("=" * 70)
    print(f"\nServer URL: {PUBLIC_URL}")
    print(f"\nPublish from browser:\n  {publish_url}")
    print(f"\nShare this link for viewers:\n  {watch_url}")
    print("=" * 70)

In [None]:
#@title Create Public Tunnel (Cloudflared) { display-mode: "form" }
#@markdown Creates a public HTTPS URL for your MediaMTX server (no account needed!)

import subprocess
import re
import time
from urllib.parse import quote

stream_name = "live" #@param {type:"string"}

# GitHub Pages base URL (update this to your repo)
GITHUB_PAGES_URL = "https://steveseguin.github.io/colastream"

# Start cloudflared
process = subprocess.Popen(
    ["./cloudflared", "tunnel", "--url", "http://localhost:8889"],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT
)

print("Starting cloudflared tunnel...")
print("(This may take 10-20 seconds)\n")

# Wait for URL
PUBLIC_URL = None
for _ in range(30):
    line = process.stdout.readline().decode()
    if "trycloudflare.com" in line:
        match = re.search(r'https://[a-z0-9-]+\.trycloudflare\.com', line)
        if match:
            PUBLIC_URL = match.group(0)
            break
    time.sleep(1)

if PUBLIC_URL:
    print("=" * 70)
    print("TUNNEL ACTIVE!")
    print("=" * 70)
    print(f"\nServer URL: {PUBLIC_URL}")
    print(f"\nWHIP endpoint: {PUBLIC_URL}/{stream_name}/whip")
    print(f"WHEP endpoint: {PUBLIC_URL}/{stream_name}/whep")
    
    # Generate GitHub Pages links
    publish_url = f"{GITHUB_PAGES_URL}/?server={quote(PUBLIC_URL)}&stream={quote(stream_name)}"
    watch_url = f"{GITHUB_PAGES_URL}/watch.html?server={quote(PUBLIC_URL)}&stream={quote(stream_name)}"
    
    print("\n" + "=" * 70)
    print("WEB INTERFACE (with webcam selector)")
    print("=" * 70)
    print(f"\nPublish from browser:")
    print(f"  {publish_url}")
    print(f"\nShare this link for viewers:")
    print(f"  {watch_url}")
    print("\n" + "=" * 70)
else:
    print("ERROR: Timeout waiting for tunnel URL")
    print("Try running this cell again.")

## 4. Test the Stream

Use the built-in test client or connect from OBS/browser.

In [None]:
#@title Browser Test Client { display-mode: "form" }
#@markdown Displays an embedded WHIP/WHEP test interface

from IPython.display import HTML, display

# Get the public URL (use a placeholder if not set)
try:
    base_url = PUBLIC_URL
except:
    base_url = "YOUR_TUNNEL_URL"

html_content = f"""
<style>
    .colastream-container {{
        font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
        max-width: 800px;
        padding: 20px;
        background: #1a1a2e;
        border-radius: 12px;
        color: #eee;
    }}
    .colastream-container h3 {{
        color: #00d4ff;
        margin-top: 0;
    }}
    .colastream-container input, .colastream-container button {{
        padding: 10px 15px;
        margin: 5px;
        border-radius: 6px;
        border: none;
        font-size: 14px;
    }}
    .colastream-container input {{
        background: #16213e;
        color: #fff;
        width: 300px;
    }}
    .colastream-container button {{
        background: #00d4ff;
        color: #000;
        cursor: pointer;
        font-weight: bold;
    }}
    .colastream-container button:hover {{
        background: #00a8cc;
    }}
    .colastream-container button.stop {{
        background: #ff4757;
    }}
    .colastream-container video {{
        width: 100%;
        max-width: 640px;
        background: #000;
        border-radius: 8px;
        margin: 10px 0;
    }}
    .colastream-container .status {{
        padding: 8px 12px;
        background: #16213e;
        border-radius: 6px;
        margin: 10px 0;
        font-family: monospace;
    }}
    .colastream-container .section {{
        margin: 20px 0;
        padding: 15px;
        background: #16213e;
        border-radius: 8px;
    }}
</style>

<div class="colastream-container">
    <h3>ColaStream WebRTC Test Client</h3>
    
    <div class="section">
        <strong>Server URL:</strong><br>
        <input type="text" id="serverUrl" value="{base_url}" style="width: 400px;">
    </div>
    
    <div class="section">
        <strong>Publish Stream (WHIP)</strong><br>
        <input type="text" id="publishPath" value="live" placeholder="Stream name">
        <button onclick="startPublish()">Start Publishing</button>
        <button onclick="stopPublish()" class="stop">Stop</button>
        <div class="status" id="publishStatus">Not publishing</div>
        <video id="localVideo" autoplay muted playsinline></video>
    </div>
    
    <div class="section">
        <strong>Watch Stream (WHEP)</strong><br>
        <input type="text" id="watchPath" value="live" placeholder="Stream name">
        <button onclick="startWatch()">Start Watching</button>
        <button onclick="stopWatch()" class="stop">Stop</button>
        <div class="status" id="watchStatus">Not watching</div>
        <video id="remoteVideo" autoplay playsinline></video>
    </div>
</div>

<script>
let publishPc = null;
let watchPc = null;
let localStream = null;

async function startPublish() {{
    const serverUrl = document.getElementById('serverUrl').value;
    const path = document.getElementById('publishPath').value;
    const status = document.getElementById('publishStatus');
    
    try {{
        status.textContent = 'Requesting camera access...';
        localStream = await navigator.mediaDevices.getUserMedia({{video: true, audio: true}});
        document.getElementById('localVideo').srcObject = localStream;
        
        status.textContent = 'Creating WebRTC connection...';
        publishPc = new RTCPeerConnection({{
            iceServers: [
                {{ urls: 'stun:stun.l.google.com:19302' }}
            ]
        }});
        
        localStream.getTracks().forEach(track => publishPc.addTrack(track, localStream));
        
        const offer = await publishPc.createOffer();
        await publishPc.setLocalDescription(offer);
        
        status.textContent = 'Sending offer to server...';
        const response = await fetch(`${{serverUrl}}/${{path}}/whip`, {{
            method: 'POST',
            headers: {{ 'Content-Type': 'application/sdp' }},
            body: offer.sdp
        }});
        
        if (!response.ok) throw new Error(`Server error: ${{response.status}}`);
        
        const answer = await response.text();
        await publishPc.setRemoteDescription({{ type: 'answer', sdp: answer }});
        
        status.textContent = 'Publishing to: ' + path;
        status.style.color = '#00ff88';
    }} catch (e) {{
        status.textContent = 'Error: ' + e.message;
        status.style.color = '#ff4757';
        console.error(e);
    }}
}}

function stopPublish() {{
    if (publishPc) {{ publishPc.close(); publishPc = null; }}
    if (localStream) {{ localStream.getTracks().forEach(t => t.stop()); localStream = null; }}
    document.getElementById('localVideo').srcObject = null;
    document.getElementById('publishStatus').textContent = 'Stopped';
    document.getElementById('publishStatus').style.color = '#eee';
}}

async function startWatch() {{
    const serverUrl = document.getElementById('serverUrl').value;
    const path = document.getElementById('watchPath').value;
    const status = document.getElementById('watchStatus');
    
    try {{
        status.textContent = 'Creating WebRTC connection...';
        watchPc = new RTCPeerConnection({{
            iceServers: [
                {{ urls: 'stun:stun.l.google.com:19302' }}
            ]
        }});
        
        watchPc.ontrack = (e) => {{
            document.getElementById('remoteVideo').srcObject = e.streams[0];
        }};
        
        watchPc.addTransceiver('video', {{ direction: 'recvonly' }});
        watchPc.addTransceiver('audio', {{ direction: 'recvonly' }});
        
        const offer = await watchPc.createOffer();
        await watchPc.setLocalDescription(offer);
        
        status.textContent = 'Connecting to stream...';
        const response = await fetch(`${{serverUrl}}/${{path}}/whep`, {{
            method: 'POST',
            headers: {{ 'Content-Type': 'application/sdp' }},
            body: offer.sdp
        }});
        
        if (!response.ok) throw new Error(`Server error: ${{response.status}}`);
        
        const answer = await response.text();
        await watchPc.setRemoteDescription({{ type: 'answer', sdp: answer }});
        
        status.textContent = 'Watching: ' + path;
        status.style.color = '#00ff88';
    }} catch (e) {{
        status.textContent = 'Error: ' + e.message;
        status.style.color = '#ff4757';
        console.error(e);
    }}
}}

function stopWatch() {{
    if (watchPc) {{ watchPc.close(); watchPc = null; }}
    document.getElementById('remoteVideo').srcObject = null;
    document.getElementById('watchStatus').textContent = 'Stopped';
    document.getElementById('watchStatus').style.color = '#eee';
}}
</script>
"""

display(HTML(html_content))

## 5. OBS Studio Setup

To publish from OBS Studio 30+:

1. Go to **Settings > Stream**
2. Set **Service** to "WHIP"
3. Set **Server** to: `YOUR_TUNNEL_URL/live/whip`
4. Leave **Bearer Token** empty
5. Click **Start Streaming**

In [None]:
#@title Show OBS Settings { display-mode: "form" }

try:
    url = PUBLIC_URL
except:
    url = "YOUR_TUNNEL_URL"

print("OBS Studio WHIP Settings")
print("=" * 40)
print(f"Service: WHIP")
print(f"Server:  {url}/live/whip")
print(f"Bearer Token: (leave empty)")
print("=" * 40)

## 6. AI Video Processing

Connect to the RTSP stream for frame-by-frame AI processing.

In [None]:
#@title Install OpenCV { display-mode: "form" }
!pip install -q opencv-python-headless

In [None]:
#@title AI Frame Processor Example { display-mode: "form" }
#@markdown Process video frames from the MediaMTX stream

import cv2
import numpy as np
from IPython.display import display, clear_output
import PIL.Image
import time

stream_name = "live" #@param {type:"string"}
process_frames = 30 #@param {type:"integer"}

# Connect to local RTSP stream
rtsp_url = f"rtsp://localhost:8554/{stream_name}"
print(f"Connecting to: {rtsp_url}")

cap = cv2.VideoCapture(rtsp_url)

if not cap.isOpened():
    print("Could not connect to stream. Make sure:")
    print("1. MediaMTX is running")
    print("2. Someone is publishing to the stream")
else:
    print(f"Connected! Processing {process_frames} frames...\n")
    
    for i in range(process_frames):
        ret, frame = cap.read()
        if not ret:
            print("Stream ended or error")
            break
        
        # === YOUR AI PROCESSING HERE ===
        # Example: Convert to grayscale edge detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 100, 200)
        
        # Convert back to color for display
        processed = cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB)
        # ================================
        
        # Display every 5th frame
        if i % 5 == 0:
            clear_output(wait=True)
            img = PIL.Image.fromarray(processed)
            display(img)
            print(f"Frame {i+1}/{process_frames}")
    
    cap.release()
    print("\nDone!")

In [None]:
#@title YOLO Object Detection Example { display-mode: "form" }
#@markdown Real-time object detection on the stream

!pip install -q ultralytics

from ultralytics import YOLO
import cv2
from IPython.display import display, clear_output
import PIL.Image

stream_name = "live" #@param {type:"string"}
process_frames = 30 #@param {type:"integer"}

# Load YOLO model
model = YOLO('yolov8n.pt')  # nano model for speed

# Connect to stream
cap = cv2.VideoCapture(f"rtsp://localhost:8554/{stream_name}")

if cap.isOpened():
    for i in range(process_frames):
        ret, frame = cap.read()
        if not ret:
            break
        
        # Run YOLO inference
        results = model(frame, verbose=False)
        
        # Draw detections
        annotated = results[0].plot()
        
        # Display
        if i % 3 == 0:
            clear_output(wait=True)
            img = PIL.Image.fromarray(cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB))
            display(img)
            
            # Show detections
            for r in results:
                for box in r.boxes:
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    name = model.names[cls]
                    print(f"Detected: {name} ({conf:.2f})")
    
    cap.release()
else:
    print("Could not connect to stream")

## 7. Monitoring & Debugging

In [None]:
#@title Check Server Status { display-mode: "form" }

import requests
import json

try:
    # Get paths (streams)
    r = requests.get("http://localhost:9997/v3/paths/list")
    data = r.json()
    
    print("Active Streams:")
    print("=" * 40)
    
    if data.get("items"):
        for item in data["items"]:
            name = item.get("name", "unknown")
            ready = item.get("ready", False)
            readers = item.get("readers", [])
            
            status = "LIVE" if ready else "waiting"
            print(f"  /{name}: {status} ({len(readers)} viewers)")
    else:
        print("  No active streams")
    
    print("=" * 40)
except Exception as e:
    print(f"Error connecting to API: {e}")
    print("Is MediaMTX running?")

In [None]:
#@title View MediaMTX Logs { display-mode: "form" }
!ps aux | grep mediamtx | grep -v grep || echo "MediaMTX not running"

In [None]:
#@title Restart MediaMTX { display-mode: "form" }
!pkill -f mediamtx 2>/dev/null || true
import time
time.sleep(2)

import subprocess
process = subprocess.Popen(["./mediamtx", "mediamtx.yml"], 
                          stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
time.sleep(3)

if process.poll() is None:
    print("MediaMTX restarted successfully!")
else:
    print("Failed to restart")

## 8. VDO.Ninja P2P Mode (No Tunnel Required!)

As an alternative to MediaMTX + cloudflared, you can use VDO.Ninja's P2P infrastructure directly. This eliminates the need for any tunneling!

**Workflow:**
1. Open the ColaStream publish page in your browser
2. Copy the Stream ID
3. Receive the stream in Colab via VDO.Ninja

In [None]:
#@title Receive VDO.Ninja Stream { display-mode: "form" }
#@markdown Receive video from a VDO.Ninja stream using Selenium

#@markdown 1. First, publish from: https://steveseguin.github.io/colastream/publish.html
#@markdown 2. Copy your Stream ID below

stream_id = "YOUR_STREAM_ID" #@param {type:"string"}
process_frames = 30 #@param {type:"integer"}

# Install dependencies
!pip install -q selenium webdriver-manager opencv-python-headless pillow

import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
import cv2
import numpy as np
from PIL import Image
from IPython.display import display, clear_output
import base64

# Setup headless Chrome
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1280,720")

print("Starting headless browser...")
driver = webdriver.Chrome(
    service=Service(ChromeDriverManager().install()),
    options=chrome_options
)

# Navigate to VDO.Ninja view URL
vdo_url = f"https://vdo.ninja/?view={stream_id}&autostart"
print(f"Connecting to: {vdo_url}")
driver.get(vdo_url)

# Wait for video to load
print("Waiting for stream...")
time.sleep(8)

# Find video element
try:
    video = driver.find_element("tag name", "video")
    print(f"Video found! Processing {process_frames} frames...\n")
    
    for i in range(process_frames):
        # Capture frame from video using canvas
        frame_data = driver.execute_script("""
            var video = document.querySelector('video');
            if (!video || video.readyState < 2) return null;
            var canvas = document.createElement('canvas');
            canvas.width = video.videoWidth || 640;
            canvas.height = video.videoHeight || 480;
            canvas.getContext('2d').drawImage(video, 0, 0);
            return canvas.toDataURL('image/jpeg', 0.8);
        """)
        
        if frame_data and frame_data.startswith('data:image'):
            # Decode base64 image
            img_data = base64.b64decode(frame_data.split(',')[1])
            img_array = np.frombuffer(img_data, dtype=np.uint8)
            frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
            
            if frame is not None:
                # === YOUR AI PROCESSING HERE ===
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                edges = cv2.Canny(gray, 100, 200)
                processed = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
                # ================================
                
                # Display
                if i % 3 == 0:
                    clear_output(wait=True)
                    img = Image.fromarray(cv2.cvtColor(processed, cv2.COLOR_BGR2RGB))
                    display(img)
                    print(f"Frame {i+1}/{process_frames} - {frame.shape[1]}x{frame.shape[0]}")
        
        time.sleep(0.1)  # ~10 fps capture rate
    
    print("\nDone!")

except Exception as e:
    print(f"Error: {e}")
    print("Make sure someone is publishing to the stream ID")

finally:
    driver.quit()