In [5]:
%pip install -q torch transformers datasets scikit-learn pandas numpy

Note: you may need to restart the kernel to use updated packages.


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class X3DBackbone(nn.Module):
    """X3D-S backbone for video feature extraction"""
    def __init__(self):
        super(X3DBackbone, self).__init__()
        # Simplified X3D-S architecture
        self.conv1 = nn.Conv3d(3, 24, kernel_size=(1, 3, 3), 
                               stride=(1, 2, 2), padding=(0, 1, 1))
        self.conv2 = nn.Conv3d(24, 48, kernel_size=(3, 3, 3), 
                               stride=(1, 2, 2), padding=(1, 1, 1))
        self.conv3 = nn.Conv3d(48, 96, kernel_size=(3, 3, 3), 
                               stride=(2, 2, 2), padding=(1, 1, 1))
        self.conv4 = nn.Conv3d(96, 192, kernel_size=(3, 3, 3), 
                               stride=(2, 2, 2), padding=(1, 1, 1))
        
    def forward(self, x):
        # Input: [B, 3, T, W, H]
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))  # Output: [B, 192, T/4, W/16, H/16]
        return x

class ConditionalGroupNorm(nn.Module):
    """Conditional Group Normalization with PSNR conditioning"""
    def __init__(self, num_features, num_groups=4):
        super(ConditionalGroupNorm, self).__init__()
        self.num_groups = num_groups
        self.group_norm = nn.GroupNorm(num_groups, num_features)
        
        # Conditioning network for PSNR target
        self.condition_net = nn.Sequential(
            nn.Linear(1, 64),
            nn.GELU(),
            nn.Linear(64, 128),
            nn.GELU(),
            nn.Linear(128, 2 * num_features)  # For gamma and beta
        )
        
    def forward(self, x, psnr_target):
        # x: [B, C, T, H, W], psnr_target: [B, 1]
        normalized = self.group_norm(x)
        
        # Generate conditioning parameters
        log_psnr = torch.log10(psnr_target)
        condition_params = self.condition_net(log_psnr)
        
        gamma, beta = condition_params.chunk(2, dim=1)
        gamma = gamma.view(-1, x.size(1), 1, 1, 1)
        beta = beta.view(-1, x.size(1), 1, 1, 1)
        
        return gamma * normalized + beta

class H264QualityController(nn.Module):
    """Complete RTQC system for H.264 quality control"""
    def __init__(self, num_classes=52):  # QP range 0-51
        super(H264QualityController, self).__init__()
        
        self.backbone = X3DBackbone()
        
        # Prediction head with CGN blocks
        self.pred_conv1 = nn.Conv3d(192, 256, kernel_size=3, padding=1)
        self.cgn1 = ConditionalGroupNorm(256)
        
        self.pred_conv2 = nn.Conv3d(256, 512, kernel_size=3, padding=1)
        self.cgn2 = ConditionalGroupNorm(512)
        
        # Global average pooling and classifier
        self.global_pool = nn.AdaptiveAvgPool3d(1)
        self.classifier = nn.Linear(512, num_classes)
        
    def forward(self, video_chunk, min_psnr):
        # video_chunk: [B, 3, T, W, H], min_psnr: [B, 1]
        features = self.backbone(video_chunk)
        
        # Apply prediction head with conditional normalization
        x = F.relu(self.cgn1(self.pred_conv1(features), min_psnr))
        x = F.relu(self.cgn2(self.pred_conv2(x), min_psnr))
        
        # Global pooling and classification
        x = self.global_pool(x).flatten(1)
        qp_logits = self.classifier(x)
        
        return qp_logits


In [1]:
    def encode_h264_psnr(self, video_chunk, qp):
        """Real H.264 encoding and PSNR calculation using OpenCV"""
        print(f"encode_h264_psnr called with QP {qp.item()}")
        try:

In [15]:
    for qp in qp_values:
        print(f"Calling encode_h264_psnr with QP {qp}")
        psnr_value = pipeline.encode_h264_psnr(test_video, torch.tensor(qp))
        print(f"Returned PSNR = {psnr_value:.2f} dB (JPEG quality: {max(10, 100 - int(qp * 1.8))})")

NameError: name 'qp_values' is not defined

In [9]:
import subprocess
import json
import time

class RealTimeH264Controller:
    """Integration with FFmpeg H.264 encoder for live streaming"""
    
    def __init__(self, model_path, device='cuda'):
        self.model = H264QualityController()
        self.model.load_state_dict(torch.load(model_path))
        self.model.to(device).eval()
        self.device = device
        
    def encode_video_stream(self, input_stream, output_stream, target_psnr=35.0):
        """Process live video stream with adaptive QP control"""
        
        chunk_duration = 0.32  # seconds (8 frames at 25fps)
        frame_buffer = []
        
        while True:
            # Collect video chunk (8 consecutive frames)
            chunk = self.collect_video_chunk(input_stream, chunk_duration)
            if chunk is None:
                break
                
            # Predict optimal QP
            with torch.no_grad():
                video_tensor = self.preprocess_chunk(chunk)
                psnr_tensor = torch.tensor([[target_psnr]], device=self.device)
                
                qp_logits = self.model(video_tensor, psnr_tensor)
                predicted_qp = torch.argmax(qp_logits, dim=1).item()
                
                # Apply conservative adjustment
                adjusted_qp = max(0, predicted_qp - 1)
            
            # Encode chunk with predicted QP
            encoded_chunk = self.encode_chunk_with_qp(chunk, adjusted_qp)
            
            # Stream encoded chunk
            self.stream_chunk(encoded_chunk, output_stream)
            
            # Log performance metrics
            actual_psnr = self.calculate_chunk_psnr(chunk, encoded_chunk)
            bitrate = self.calculate_bitrate(encoded_chunk)
            
            print(f"QP: {adjusted_qp}, PSNR: {actual_psnr:.2f}, "
                  f"Bitrate: {bitrate:.0f} kbps")
    
    def encode_chunk_with_qp(self, chunk, qp):
        """Encode video chunk using FFmpeg with specified QP"""
        
        # Prepare FFmpeg command with constant QP
        cmd = [
            'ffmpeg', '-y', '-f', 'rawvideo', '-pix_fmt', 'yuv420p',
            '-s', '176x144', '-r', '25', '-i', '-',  # Input from stdin
            '-c:v', 'libx264', '-qp', str(qp),
            '-g', '8', '-keyint_min', '8',  # GOP size = 8
            '-f', 'h264', '-'  # Output to stdout
        ]
        
        # Execute encoding
        process = subprocess.Popen(
            cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL
        )
        
        # Feed raw video data
        raw_data = self.chunk_to_raw_bytes(chunk)
        encoded_data, _ = process.communicate(input=raw_data)
        
        return encoded_data
    
    def preprocess_chunk(self, chunk):
        """Convert video chunk to tensor format"""
        # chunk: numpy array [T, H, W, C] -> [1, C, T, H, W]
        tensor = torch.from_numpy(chunk).float()
        tensor = tensor.permute(3, 0, 1, 2).unsqueeze(0)  # Add batch dimension
        tensor = tensor / 255.0  # Normalize to [0, 1]
        return tensor.to(self.device)
