# Ooblex CI/CD Test Suite

Automated tests for Ooblex running on Google Colab GPU.

This notebook can be run automatically via:
- GitHub Actions with `colab-runner`
- Manual testing before releases
- Verifying GPU functionality

**Exit code 0 = All tests passed, non-zero = Failures**

In [None]:
# System info
!nvidia-smi
!python --version
!pip --version

In [None]:
# Install test dependencies
!pip install -q pytest pytest-asyncio mediapipe pyngrok aiohttp aiortc

In [None]:
# Clone Ooblex
!git clone https://github.com/ooblex/ooblex.git /content/ooblex 2>/dev/null || git -C /content/ooblex pull
!cd /content/ooblex && git log --oneline -5

In [None]:
%%writefile /content/test_ooblex_colab.py
"""
Ooblex End-to-End Tests for Google Colab

Tests the full pipeline:
1. Image processing effects
2. GPU acceleration
3. MJPEG encoding
4. Server endpoints
5. Frame processing pipeline
"""

import pytest
import numpy as np
import cv2
import sys
import time
import threading
import requests
import base64

sys.path.insert(0, '/content/ooblex/colab')


class TestEnvironment:
    """Test the Colab environment setup."""

    def test_opencv_import(self):
        """Test OpenCV is available."""
        import cv2
        assert cv2.__version__ is not None

    def test_numpy_import(self):
        """Test NumPy is available."""
        import numpy as np
        assert np.__version__ is not None

    def test_torch_import(self):
        """Test PyTorch is available."""
        import torch
        assert torch.__version__ is not None

    def test_cuda_available(self):
        """Test CUDA GPU is available."""
        import torch
        # Note: This may fail on CPU-only Colab instances
        # We just check that the check itself works
        cuda_status = torch.cuda.is_available()
        print(f"CUDA available: {cuda_status}")
        if cuda_status:
            print(f"GPU: {torch.cuda.get_device_name(0)}")

    def test_mediapipe_import(self):
        """Test MediaPipe is available."""
        import mediapipe
        assert mediapipe.__version__ is not None


class TestEffectsProcessor:
    """Test the image effects processor."""

    @pytest.fixture
    def processor(self):
        from ooblex_demo import EffectsProcessor
        return EffectsProcessor()

    @pytest.fixture
    def test_image(self):
        """Create a test image with recognizable features."""
        img = np.zeros((480, 640, 3), dtype=np.uint8)
        # Add some shapes
        cv2.rectangle(img, (100, 100), (200, 200), (0, 255, 0), -1)  # Green square
        cv2.circle(img, (400, 240), 80, (255, 0, 0), -1)  # Blue circle
        cv2.putText(img, "TEST", (250, 400), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3)
        return img

    def test_get_available_effects(self, processor):
        """Test that effects list is populated."""
        effects = processor.get_available_effects()
        assert len(effects) >= 10  # At least 10 effects
        assert "none" in effects
        assert "face_detection" in effects
        assert "edge_detection" in effects

    def test_effect_none(self, processor, test_image):
        """Test passthrough (no effect)."""
        result = processor.process(test_image, "none")
        assert result.shape == test_image.shape
        assert np.array_equal(result, test_image)

    def test_effect_grayscale(self, processor, test_image):
        """Test grayscale conversion."""
        result = processor.process(test_image, "grayscale")
        assert result.shape == test_image.shape
        # Grayscale should have same values in all channels
        assert np.allclose(result[:,:,0], result[:,:,1])
        assert np.allclose(result[:,:,1], result[:,:,2])

    def test_effect_edge_detection(self, processor, test_image):
        """Test edge detection."""
        result = processor.process(test_image, "edge_detection")
        assert result.shape == test_image.shape
        # Edge detection should produce mostly black with white edges
        assert result.mean() < test_image.mean()  # More black than original

    def test_effect_cartoon(self, processor, test_image):
        """Test cartoon effect."""
        result = processor.process(test_image, "cartoon")
        assert result.shape == test_image.shape

    def test_effect_sepia(self, processor, test_image):
        """Test sepia effect."""
        result = processor.process(test_image, "sepia")
        assert result.shape == test_image.shape

    def test_effect_blur(self, processor, test_image):
        """Test blur effect."""
        result = processor.process(test_image, "blur")
        assert result.shape == test_image.shape

    def test_effect_pixelate(self, processor, test_image):
        """Test pixelation effect."""
        result = processor.process(test_image, "pixelate")
        assert result.shape == test_image.shape

    def test_effect_invert(self, processor, test_image):
        """Test color inversion."""
        result = processor.process(test_image, "invert")
        assert result.shape == test_image.shape
        # Inverting twice should return original
        result2 = processor.process(result, "invert")
        assert np.array_equal(result2, test_image)

    def test_effect_mirror(self, processor, test_image):
        """Test horizontal flip."""
        result = processor.process(test_image, "mirror")
        assert result.shape == test_image.shape
        # Mirror twice should return original
        result2 = processor.process(result, "mirror")
        assert np.array_equal(result2, test_image)

    def test_effect_emboss(self, processor, test_image):
        """Test emboss effect."""
        result = processor.process(test_image, "emboss")
        assert result.shape == test_image.shape

    def test_effect_sketch(self, processor, test_image):
        """Test sketch effect."""
        result = processor.process(test_image, "sketch")
        assert result.shape == test_image.shape

    def test_unknown_effect_returns_original(self, processor, test_image):
        """Test that unknown effects return original image."""
        result = processor.process(test_image, "nonexistent_effect_xyz")
        assert np.array_equal(result, test_image)


class TestOoblexDemo:
    """Test the main demo class."""

    @pytest.fixture
    def demo(self):
        from ooblex_demo import OoblexDemo, DemoConfig
        config = DemoConfig(port=8888)  # Use different port for testing
        return OoblexDemo(config)

    @pytest.fixture
    def test_image(self):
        img = np.zeros((480, 640, 3), dtype=np.uint8)
        cv2.rectangle(img, (100, 100), (200, 200), (0, 255, 0), -1)
        return img

    def test_encode_frame(self, demo, test_image):
        """Test JPEG encoding."""
        encoded = demo._encode_frame(test_image)
        assert len(encoded) > 0
        assert encoded[:2] == b'\xff\xd8'  # JPEG magic bytes

    def test_process_frame(self, demo, test_image):
        """Test full frame processing pipeline."""
        demo.current_effect = "grayscale"
        result = demo.process_frame(test_image)
        assert len(result) > 0
        assert result[:2] == b'\xff\xd8'  # JPEG magic bytes

    def test_set_effect(self, demo):
        """Test effect switching."""
        demo.set_effect("cartoon")
        assert demo.current_effect == "cartoon"

        demo.set_effect("edge_detection")
        assert demo.current_effect == "edge_detection"

    def test_receive_frame_base64(self, demo):
        """Test receiving base64 encoded frames."""
        # Create test image and encode as base64
        img = np.zeros((100, 100, 3), dtype=np.uint8)
        cv2.rectangle(img, (10, 10), (90, 90), (255, 0, 0), -1)
        _, buffer = cv2.imencode('.jpg', img)
        b64 = base64.b64encode(buffer).decode()

        # Receive the frame
        demo.receive_frame(f"data:image/jpeg;base64,{b64}")

        # Check frame was received and processed
        assert demo.latest_frame is not None
        assert demo.processed_frame is not None


class TestGPUProcessing:
    """Test GPU-accelerated processing (MediaPipe)."""

    @pytest.fixture
    def processor(self):
        from ooblex_demo import EffectsProcessor
        return EffectsProcessor()

    @pytest.fixture
    def face_image(self):
        """Create a simple face-like image for testing."""
        img = np.ones((480, 640, 3), dtype=np.uint8) * 200  # Light gray background
        # Face oval
        cv2.ellipse(img, (320, 240), (100, 130), 0, 0, 360, (180, 150, 140), -1)
        # Eyes
        cv2.circle(img, (280, 200), 15, (50, 50, 50), -1)
        cv2.circle(img, (360, 200), 15, (50, 50, 50), -1)
        # Nose
        cv2.line(img, (320, 220), (320, 260), (150, 120, 110), 3)
        # Mouth
        cv2.ellipse(img, (320, 300), (40, 20), 0, 0, 180, (100, 80, 80), 2)
        return img

    def test_face_detection_runs(self, processor, face_image):
        """Test that face detection runs without errors."""
        result = processor.process(face_image, "face_detection")
        assert result.shape == face_image.shape

    def test_face_mesh_runs(self, processor, face_image):
        """Test MediaPipe face mesh runs (if available)."""
        effects = processor.get_available_effects()
        if "face_mesh" in effects:
            result = processor.process(face_image, "face_mesh")
            assert result.shape == face_image.shape
        else:
            pytest.skip("MediaPipe face mesh not available")

    def test_selfie_segment_runs(self, processor, face_image):
        """Test MediaPipe selfie segmentation runs (if available)."""
        effects = processor.get_available_effects()
        if "selfie_segment" in effects:
            result = processor.process(face_image, "selfie_segment")
            assert result.shape == face_image.shape
        else:
            pytest.skip("MediaPipe selfie segmentation not available")


class TestPerformance:
    """Test processing performance."""

    @pytest.fixture
    def processor(self):
        from ooblex_demo import EffectsProcessor
        return EffectsProcessor()

    @pytest.fixture
    def test_image(self):
        return np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)

    def test_processing_speed(self, processor, test_image):
        """Test that effects run at reasonable speed."""
        effects_to_test = ["none", "grayscale", "edge_detection", "blur"]

        for effect in effects_to_test:
            start = time.time()
            for _ in range(10):
                processor.process(test_image, effect)
            elapsed = time.time() - start

            fps = 10 / elapsed
            print(f"{effect}: {fps:.1f} FPS")
            assert fps > 5, f"{effect} too slow: {fps:.1f} FPS"  # At least 5 FPS


class TestIntegration:
    """Integration tests for the full pipeline."""

    def test_full_pipeline(self):
        """Test complete frame processing pipeline."""
        from ooblex_demo import OoblexDemo, DemoConfig, EffectsProcessor

        # Create components
        processor = EffectsProcessor()
        demo = OoblexDemo(DemoConfig(port=8889))

        # Create test frame
        frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)

        # Process through each effect
        for effect in ["none", "grayscale", "cartoon", "edge_detection"]:
            demo.set_effect(effect)
            result = demo.process_frame(frame)
            assert len(result) > 0, f"Empty result for effect: {effect}"

        print("Full pipeline test passed!")


if __name__ == "__main__":
    pytest.main([__file__, "-v", "--tb=short"])

In [None]:
# Run the test suite
!cd /content && python -m pytest test_ooblex_colab.py -v --tb=short

In [None]:
# Also run the built-in tests
import sys
sys.path.insert(0, '/content/ooblex/colab')
from ooblex_demo import run_tests

exit_code = run_tests()

In [None]:
# Performance benchmark
import time
import numpy as np
sys.path.insert(0, '/content/ooblex/colab')
from ooblex_demo import EffectsProcessor

processor = EffectsProcessor()
test_img = np.random.randint(0, 255, (720, 1280, 3), dtype=np.uint8)  # 720p

print("Performance Benchmark (720p frames)")
print("=" * 40)

for effect in processor.get_available_effects().keys():
    # Warmup
    processor.process(test_img, effect)

    # Benchmark
    start = time.time()
    iterations = 20
    for _ in range(iterations):
        processor.process(test_img, effect)
    elapsed = time.time() - start

    fps = iterations / elapsed
    ms_per_frame = (elapsed / iterations) * 1000
    print(f"{effect:20s}: {fps:6.1f} FPS ({ms_per_frame:6.1f} ms/frame)")

## Test Summary

If all tests pass, the Colab environment is correctly set up for Ooblex.