# üöÄ PaniniFS Compression Worker (Colab Pro)

**Purpose**: GPU-accelerated semantic compression of file chunks

**Pipeline**:
1. Receive webhook from GitHub Actions
2. Fetch chunk from GitHub repository
3. Apply semantic compression (GPU-accelerated)
4. Upload compressed result to Google One
5. Send completion callback to GitHub

**Requirements**:
- Colab Pro (GPU access)
- GitHub Personal Access Token (secret)
- Google One storage (mounted via Google Drive)

---

## üîß Setup & Configuration

In [None]:
# Install dependencies
!pip install -q requests PyGithub google-auth google-auth-oauthlib google-auth-httplib2
!pip install -q pillow numpy torch torchvision
!pip install -q flask pyngrok  # For webhook server

print("‚úÖ Dependencies installed")

In [None]:
# Mount Google Drive (for Google One storage)
from google.colab import drive
drive.mount('/content/drive')

# Create compression output directory
import os
OUTPUT_DIR = '/content/drive/MyDrive/PaniniFS/compressed_chunks'
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"‚úÖ Google Drive mounted: {OUTPUT_DIR}")

In [None]:
# Configuration (use Colab secrets)
from google.colab import userdata
import os

try:
    GITHUB_TOKEN = userdata.get('GITHUB_TOKEN')
    GITHUB_REPO = userdata.get('GITHUB_REPO')  # Format: owner/repo
    print("‚úÖ GitHub credentials loaded from secrets")
except:
    print("‚ö†Ô∏è GitHub credentials not found in Colab secrets")
    print("Please add GITHUB_TOKEN and GITHUB_REPO to Colab secrets")
    GITHUB_TOKEN = None
    GITHUB_REPO = None

# GPU Check
import torch
HAS_GPU = torch.cuda.is_available()
GPU_NAME = torch.cuda.get_device_name(0) if HAS_GPU else "None"
print(f"üéÆ GPU Available: {HAS_GPU} ({GPU_NAME})")

## üì¶ Chunk Fetcher

In [None]:
import requests
import json
import base64
from pathlib import Path
from typing import Dict, Any

class ChunkFetcher:
    """Fetch chunks from GitHub repository"""
    
    def __init__(self, token: str, repo: str):
        self.token = token
        self.repo = repo
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'token {token}',
            'Accept': 'application/vnd.github.v3+json'
        })
    
    def fetch_chunk(self, chunk_path: str, output_dir: str = '/tmp') -> Dict[str, Any]:
        """Fetch chunk directory from GitHub"""
        print(f"üì• Fetching chunk: {chunk_path}")
        
        # Get directory contents
        api_url = f"https://api.github.com/repos/{self.repo}/contents/{chunk_path}"
        response = self.session.get(api_url)
        response.raise_for_status()
        
        files = response.json()
        
        # Create local directory
        local_dir = Path(output_dir) / Path(chunk_path).name
        local_dir.mkdir(parents=True, exist_ok=True)
        
        # Download each file
        chunk_data = {'path': str(local_dir), 'files': {}}
        
        for file_info in files:
            file_name = file_info['name']
            file_url = file_info['download_url']
            
            # Download file
            file_response = self.session.get(file_url)
            file_response.raise_for_status()
            
            local_file = local_dir / file_name
            local_file.write_bytes(file_response.content)
            
            chunk_data['files'][file_name] = str(local_file)
            print(f"  ‚úÖ {file_name} ({len(file_response.content)} bytes)")
        
        # Load metadata
        if 'metadata.json' in chunk_data['files']:
            with open(chunk_data['files']['metadata.json']) as f:
                chunk_data['metadata'] = json.load(f)
        
        print(f"‚úÖ Chunk fetched: {local_dir}")
        return chunk_data

print("‚úÖ ChunkFetcher class defined")

## üß† GPU-Accelerated Compressor

In [None]:
import torch
import torch.nn as nn
import numpy as np
from PIL import Image
import gzip
import hashlib

class SemanticCompressor:
    """GPU-accelerated semantic compression"""
    
    def __init__(self, use_gpu: bool = True):
        self.device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')
        print(f"üéÆ Compressor using device: {self.device}")
    
    def compress_chunk(self, chunk_data: Dict[str, Any]) -> Dict[str, Any]:
        """Compress chunk based on pattern type"""
        metadata = chunk_data.get('metadata', {})
        pattern_type = metadata.get('pattern_type', 'generic')
        
        print(f"üîß Compressing chunk (pattern: {pattern_type})...")
        
        # Get chunk content file
        content_file = chunk_data['files'].get('content', None)
        if not content_file:
            raise ValueError("No content file found in chunk")
        
        # Compression strategy based on pattern type
        if pattern_type == 'image':
            result = self._compress_image(content_file, metadata)
        elif pattern_type == 'text':
            result = self._compress_text(content_file, metadata)
        else:
            result = self._compress_generic(content_file, metadata)
        
        # Add compression stats
        original_size = Path(content_file).stat().st_size
        compressed_size = len(result['compressed_data'])
        result['compression_stats'] = {
            'original_size': original_size,
            'compressed_size': compressed_size,
            'ratio': compressed_size / original_size,
            'savings_percent': (1 - compressed_size / original_size) * 100
        }
        
        print(f"  ‚úÖ Compressed: {original_size} ‚Üí {compressed_size} bytes ({result['compression_stats']['savings_percent']:.1f}% savings)")
        
        return result
    
    def _compress_image(self, content_file: str, metadata: Dict) -> Dict:
        """GPU-accelerated image compression"""
        # Load image
        img = Image.open(content_file)
        img_array = np.array(img)
        
        # Convert to tensor and move to GPU
        tensor = torch.from_numpy(img_array).float().to(self.device)
        
        # Semantic analysis (placeholder for actual semantic compression)
        # TODO: Implement actual semantic compression algorithm
        
        # For now: efficient PNG compression
        compressed = gzip.compress(img_array.tobytes(), compresslevel=9)
        
        return {
            'compressed_data': compressed,
            'compression_method': 'semantic_image_v1',
            'reconstruction_recipe': {
                'method': 'gzip_decompress',
                'shape': img_array.shape,
                'dtype': str(img_array.dtype),
                'format': img.format
            }
        }
    
    def _compress_text(self, content_file: str, metadata: Dict) -> Dict:
        """GPU-accelerated text compression"""
        with open(content_file, 'rb') as f:
            data = f.read()
        
        # Use gzip for now (can be enhanced with semantic analysis)
        compressed = gzip.compress(data, compresslevel=9)
        
        return {
            'compressed_data': compressed,
            'compression_method': 'semantic_text_v1',
            'reconstruction_recipe': {
                'method': 'gzip_decompress',
                'encoding': 'utf-8'
            }
        }
    
    def _compress_generic(self, content_file: str, metadata: Dict) -> Dict:
        """Generic compression for unknown patterns"""
        with open(content_file, 'rb') as f:
            data = f.read()
        
        compressed = gzip.compress(data, compresslevel=9)
        
        return {
            'compressed_data': compressed,
            'compression_method': 'generic_gzip_v1',
            'reconstruction_recipe': {
                'method': 'gzip_decompress'
            }
        }

print("‚úÖ SemanticCompressor class defined")

## ‚òÅÔ∏è Google One Uploader

In [None]:
import shutil
from datetime import datetime

class GoogleOneUploader:
    """Upload compressed chunks to Google One (via Google Drive)"""
    
    def __init__(self, output_dir: str = OUTPUT_DIR):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def upload(self, compressed_result: Dict, chunk_metadata: Dict) -> Dict:
        """Upload compressed chunk to Google One"""
        chunk_id = chunk_metadata.get('chunk_id', 'unknown')
        chunk_hash = chunk_metadata.get('original_hash', 'unknown')
        
        print(f"‚òÅÔ∏è Uploading chunk {chunk_id} to Google One...")
        
        # Create chunk directory
        chunk_dir = self.output_dir / f"chunk_{chunk_id:04d}"
        chunk_dir.mkdir(exist_ok=True)
        
        # Save compressed data
        compressed_file = chunk_dir / 'compressed.bin'
        compressed_file.write_bytes(compressed_result['compressed_data'])
        
        # Save reconstruction recipe
        recipe_file = chunk_dir / 'recipe.json'
        recipe = {
            'chunk_id': chunk_id,
            'original_hash': chunk_hash,
            'compression_method': compressed_result['compression_method'],
            'reconstruction': compressed_result['reconstruction_recipe'],
            'stats': compressed_result['compression_stats'],
            'uploaded_at': datetime.utcnow().isoformat() + 'Z',
            'worker': 'colab_pro_gpu'
        }
        recipe_file.write_text(json.dumps(recipe, indent=2))
        
        # Calculate upload hash
        upload_hash = hashlib.sha256(compressed_result['compressed_data']).hexdigest()
        
        print(f"  ‚úÖ Uploaded to: {chunk_dir}")
        print(f"  üì¶ Compressed size: {len(compressed_result['compressed_data'])} bytes")
        print(f"  üîê Upload hash: {upload_hash[:16]}...")
        
        return {
            'upload_path': str(chunk_dir),
            'compressed_file': str(compressed_file),
            'recipe_file': str(recipe_file),
            'upload_hash': upload_hash,
            'stats': compressed_result['compression_stats']
        }

print("‚úÖ GoogleOneUploader class defined")

## üîÑ GitHub Callback

In [None]:
class GitHubCallback:
    """Send completion callback to GitHub"""
    
    def __init__(self, token: str, repo: str):
        self.token = token
        self.repo = repo
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'token {token}',
            'Accept': 'application/vnd.github.v3+json'
        })
    
    def send_completion(self, chunk_path: str, upload_result: Dict) -> bool:
        """Send completion status back to GitHub"""
        print(f"üì§ Sending completion callback to GitHub...")
        
        # Update chunk metadata in repository via GitHub API
        # For now, create an issue comment or use repository dispatch
        
        # Option 1: Repository dispatch event
        dispatch_url = f"https://api.github.com/repos/{self.repo}/dispatches"
        payload = {
            'event_type': 'chunk_compressed',
            'client_payload': {
                'chunk_path': chunk_path,
                'status': 'completed',
                'upload_path': upload_result['upload_path'],
                'upload_hash': upload_result['upload_hash'],
                'stats': upload_result['stats']
            }
        }
        
        try:
            response = self.session.post(dispatch_url, json=payload)
            response.raise_for_status()
            print("  ‚úÖ Callback sent successfully")
            return True
        except Exception as e:
            print(f"  ‚ùå Callback failed: {e}")
            return False

print("‚úÖ GitHubCallback class defined")

## üöÄ Main Worker Function

In [None]:
def process_chunk(chunk_info: Dict) -> Dict:
    """Main processing function"""
    try:
        print("="*60)
        print("üöÄ Starting chunk compression...")
        print("="*60)
        
        # Initialize components
        fetcher = ChunkFetcher(GITHUB_TOKEN, GITHUB_REPO)
        compressor = SemanticCompressor(use_gpu=HAS_GPU)
        uploader = GoogleOneUploader()
        callback = GitHubCallback(GITHUB_TOKEN, GITHUB_REPO)
        
        # 1. Fetch chunk
        chunk_path = chunk_info['chunk_path']
        chunk_data = fetcher.fetch_chunk(chunk_path)
        
        # 2. Compress
        compressed_result = compressor.compress_chunk(chunk_data)
        
        # 3. Upload to Google One
        upload_result = uploader.upload(compressed_result, chunk_data.get('metadata', {}))
        
        # 4. Send callback
        callback.send_completion(chunk_path, upload_result)
        
        print("="*60)
        print("‚úÖ Chunk processing completed successfully!")
        print("="*60)
        
        return {
            'status': 'success',
            'chunk_path': chunk_path,
            'upload_result': upload_result
        }
        
    except Exception as e:
        print("="*60)
        print(f"‚ùå Error processing chunk: {e}")
        print("="*60)
        import traceback
        traceback.print_exc()
        return {
            'status': 'error',
            'error': str(e)
        }

print("‚úÖ Main worker function defined")

## üåê Webhook Server (Optional)

In [None]:
from flask import Flask, request, jsonify
from threading import Thread
from pyngrok import ngrok

app = Flask(__name__)
processing_queue = []

@app.route('/webhook', methods=['POST'])
def webhook():
    """Receive webhook from GitHub Actions"""
    data = request.json
    print(f"üì® Received webhook: {data.get('action')}")
    
    if data.get('action') == 'compress_chunk':
        chunk_info = data.get('chunk_info', {})
        processing_queue.append(chunk_info)
        
        # Process immediately in background
        Thread(target=process_chunk, args=(chunk_info,)).start()
        
        return jsonify({
            'status': 'accepted',
            'message': 'Chunk queued for processing'
        }), 202
    
    return jsonify({'status': 'ok'}), 200

@app.route('/status', methods=['GET'])
def status():
    """Get worker status"""
    return jsonify({
        'status': 'ready',
        'gpu': GPU_NAME,
        'queue_length': len(processing_queue)
    })

def start_webhook_server():
    """Start Flask server with ngrok tunnel"""
    # Start ngrok tunnel
    public_url = ngrok.connect(5000)
    print(f"\n{'='*60}")
    print(f"üåê Webhook server started!")
    print(f"{'='*60}")
    print(f"Public URL: {public_url}")
    print(f"Webhook endpoint: {public_url}/webhook")
    print(f"Status endpoint: {public_url}/status")
    print(f"\n‚ö†Ô∏è Add this URL to GitHub secrets as COLAB_WEBHOOK_URL")
    print(f"{'='*60}\n")
    
    # Start Flask
    app.run(port=5000)

print("‚úÖ Webhook server defined")
print("\nTo start webhook server, run: start_webhook_server()")

## üß™ Test Manual Processing

In [None]:
# Test with manual chunk info
test_chunk_info = {
    'chunk_path': 'pending_compression/test_image/chunk_0000',
    'pattern_type': 'image',
    'chunk_id': 0
}

# Uncomment to test:
# result = process_chunk(test_chunk_info)
# print(json.dumps(result, indent=2))

print("‚úÖ Ready for manual testing")
print("Uncomment the lines above to test with a chunk")

## üéØ Start Webhook Server

**Run this cell to start the webhook server and wait for GitHub Actions to dispatch chunks**

In [None]:
# Start webhook server
# This will run indefinitely and process chunks as they arrive

if GITHUB_TOKEN and GITHUB_REPO:
    print("üöÄ Starting compression worker with webhook server...")
    start_webhook_server()
else:
    print("‚ùå Cannot start: GitHub credentials not configured")
    print("Please add GITHUB_TOKEN and GITHUB_REPO to Colab secrets")

---

## üìù Notes

**Setup Steps**:
1. Add secrets to Colab:
   - `GITHUB_TOKEN`: Personal access token with repo access
   - `GITHUB_REPO`: Format `owner/repo`
2. Run all cells above
3. Copy the ngrok webhook URL
4. Add URL to GitHub repository secrets as `COLAB_WEBHOOK_URL`
5. Notebook will process chunks automatically when GitHub Actions dispatches them

**GPU Optimization**:
- This notebook uses GPU when available (Colab Pro)
- Semantic compression algorithms can be enhanced with deep learning models
- Current implementation uses efficient gzip as baseline

**Google One Storage**:
- Compressed chunks saved to Google Drive (which syncs to Google One)
- Path: `/content/drive/MyDrive/PaniniFS/compressed_chunks/`
- Each chunk has: `compressed.bin` + `recipe.json`

**Monitoring**:
- Check webhook status: `{ngrok_url}/status`
- View processing logs in notebook output
- GitHub Actions receives callbacks after completion

---

**Last Updated**: November 12, 2025  
**Version**: 1.0  
**Architecture**: See `docs/architecture/ASYNC_SEMANTIC_COMPRESSION_PIPELINE.md`