# LiveCalc GPU API Server - Google Colab

This notebook runs a FastAPI server on Google Colab with GPU acceleration.
It exposes REST endpoints for submitting projection jobs remotely from VS Code.

**Features:**
- FastAPI REST endpoints: /submit, /status, /results, /health
- ngrok tunnel for public HTTPS access
- Asynchronous job processing in background
- GPU acceleration via Numba CUDA
- Automatic cleanup after 1 hour

**Usage:**
1. Run all cells in order
2. Copy the ngrok URL from output
3. Configure VS Code extension with the URL
4. Submit jobs from VS Code

**Note:** Free tier may disconnect after 12 hours. Use Colab Pro for more stability.

## 1. Setup and Installation

In [None]:
# Install dependencies
!pip install -q fastapi uvicorn pyngrok numba cupy-cuda11x nest-asyncio

In [None]:
# Check GPU availability
from numba import cuda
import cupy as cp

if not cuda.is_available():
    print("‚ùå CUDA not available! Enable GPU in Runtime > Change runtime type")
else:
    device = cuda.get_current_device()
    print(f"‚úÖ GPU Available: {device.name.decode('utf-8')}")
    print(f"   Memory: {device.total_memory / 1e9:.2f} GB")
    print(f"   Compute Capability: {device.compute_capability}")

## 2. Upload GPU Engine Code

In [None]:
# Upload numba_engine.py from your local machine
# Option 1: Use Colab file upload
from google.colab import files

print("Upload numba_engine.py file:")
uploaded = files.upload()

# Verify upload
import os
if 'numba_engine.py' in os.listdir('.'):
    print("‚úÖ numba_engine.py uploaded successfully")
else:
    print("‚ùå numba_engine.py not found. Please upload it.")

In [None]:
# Alternative: Download from GitHub (if you've committed the code)
# !wget https://raw.githubusercontent.com/themitchelli/LiveCalc/main/livecalc-engines/gpu/numba_engine.py

## 3. FastAPI Server Implementation

In [None]:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Optional
import numpy as np
import time
import uuid
import threading
from datetime import datetime, timedelta

# Import GPU engine
from numba_engine import (
    NumbaGPUEngine, Policy, ProjectionConfig, ExpenseAssumptions,
    Gender, ProductType, UnderwritingClass
)

# Initialize FastAPI app
app = FastAPI(
    title="LiveCalc GPU API",
    description="GPU-accelerated actuarial projection API running on Google Colab",
    version="1.0"
)

# Enable CORS for VS Code extension
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize GPU engine
engine = NumbaGPUEngine()
print(f"‚úÖ GPU Engine initialized: {engine.get_schema()['gpu_model']}")

# Job storage (in-memory)
jobs: Dict[str, Dict] = {}
JOB_TIMEOUT_SECONDS = 15 * 60  # 15 minutes
RESULT_RETENTION_SECONDS = 60 * 60  # 1 hour

In [None]:
# Pydantic models for request/response

class PolicyData(BaseModel):
    policy_id: int
    age: int
    gender: int  # 0=Male, 1=Female
    sum_assured: float
    premium: float
    term: int
    product_type: int = 0  # 0=Term
    underwriting_class: int = 0  # 0=Standard

class JobSubmitRequest(BaseModel):
    policies: List[PolicyData]
    scenarios: List[List[float]]  # num_scenarios √ó 50 years
    mortality_table: List[List[float]]  # 2 √ó 121
    lapse_table: List[float]  # 50 years
    expenses: Dict[str, float]
    config: Optional[Dict[str, float]] = None

class JobSubmitResponse(BaseModel):
    job_id: str
    status: str
    submitted_at: str
    num_policies: int
    num_scenarios: int

class JobStatusResponse(BaseModel):
    job_id: str
    status: str  # 'queued', 'running', 'completed', 'failed'
    submitted_at: str
    started_at: Optional[str] = None
    completed_at: Optional[str] = None
    progress: float = 0.0  # 0.0 to 1.0
    error: Optional[str] = None

class JobResultResponse(BaseModel):
    job_id: str
    status: str
    npvs: List[List[float]]  # num_policies √ó num_scenarios
    statistics: Dict[str, float]
    timing: Dict[str, float]
    gpu_model: str

In [None]:
# Background job processor

def process_job(job_id: str):
    """Process a projection job in background"""
    try:
        job = jobs[job_id]
        job['status'] = 'running'
        job['started_at'] = datetime.utcnow().isoformat()
        
        # Parse input data
        policies = [
            Policy(
                policy_id=p['policy_id'],
                age=p['age'],
                gender=Gender(p['gender']),
                sum_assured=p['sum_assured'],
                premium=p['premium'],
                term=p['term'],
                product_type=ProductType(p.get('product_type', 0)),
                underwriting_class=UnderwritingClass(p.get('underwriting_class', 0))
            )
            for p in job['request']['policies']
        ]
        
        scenarios = np.array(job['request']['scenarios'], dtype=np.float64)
        mortality_table = np.array(job['request']['mortality_table'], dtype=np.float64)
        lapse_table = np.array(job['request']['lapse_table'], dtype=np.float64)
        
        exp = job['request']['expenses']
        expenses = ExpenseAssumptions(
            per_policy_acquisition=exp['per_policy_acquisition'],
            per_policy_maintenance=exp['per_policy_maintenance'],
            percent_of_premium=exp['percent_of_premium'],
            claim_expense=exp['claim_expense']
        )
        
        config_data = job['request'].get('config', {})
        config = ProjectionConfig(
            detailed_cashflows=False,
            mortality_multiplier=config_data.get('mortality_multiplier', 1.0),
            lapse_multiplier=config_data.get('lapse_multiplier', 1.0),
            expense_multiplier=config_data.get('expense_multiplier', 1.0)
        )
        
        # Run projection
        job['progress'] = 0.5
        result = engine.project(policies, scenarios, mortality_table, lapse_table, expenses, config)
        
        # Store results
        job['status'] = 'completed'
        job['completed_at'] = datetime.utcnow().isoformat()
        job['progress'] = 1.0
        job['result'] = {
            'npvs': result.npvs.tolist(),
            'statistics': {
                'mean': float(np.mean(result.npvs)),
                'std': float(np.std(result.npvs)),
                'min': float(np.min(result.npvs)),
                'max': float(np.max(result.npvs)),
                'median': float(np.median(result.npvs))
            },
            'timing': {
                'total_runtime': result.total_runtime,
                'kernel_time': result.kernel_time,
                'memory_transfer_time': result.memory_transfer_time
            },
            'gpu_model': engine.get_schema()['gpu_model']
        }
        
    except Exception as e:
        job['status'] = 'failed'
        job['completed_at'] = datetime.utcnow().isoformat()
        job['error'] = str(e)
        print(f"Job {job_id} failed: {e}")

In [None]:
# API Endpoints

@app.get("/")
async def root():
    return {
        "service": "LiveCalc GPU API",
        "version": "1.0",
        "status": "running",
        "gpu": engine.get_schema()['gpu_model']
    }

@app.get("/health")
async def health():
    """Health check endpoint"""
    schema = engine.get_schema()
    return {
        "status": "healthy",
        "gpu_model": schema['gpu_model'],
        "gpu_memory_gb": schema['gpu_memory_gb'],
        "compute_capability": schema['compute_capability'],
        "active_jobs": sum(1 for j in jobs.values() if j['status'] in ['queued', 'running']),
        "total_jobs": len(jobs)
    }

@app.post("/submit", response_model=JobSubmitResponse)
async def submit_job(request: JobSubmitRequest, background_tasks: BackgroundTasks):
    """Submit a new projection job"""
    job_id = str(uuid.uuid4())
    
    # Create job record
    job = {
        'job_id': job_id,
        'status': 'queued',
        'submitted_at': datetime.utcnow().isoformat(),
        'started_at': None,
        'completed_at': None,
        'progress': 0.0,
        'request': request.dict(),
        'result': None,
        'error': None
    }
    jobs[job_id] = job
    
    # Schedule background processing
    background_tasks.add_task(process_job, job_id)
    
    return JobSubmitResponse(
        job_id=job_id,
        status='queued',
        submitted_at=job['submitted_at'],
        num_policies=len(request.policies),
        num_scenarios=len(request.scenarios)
    )

@app.get("/status/{job_id}", response_model=JobStatusResponse)
async def get_job_status(job_id: str):
    """Get status of a job"""
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    job = jobs[job_id]
    return JobStatusResponse(
        job_id=job_id,
        status=job['status'],
        submitted_at=job['submitted_at'],
        started_at=job.get('started_at'),
        completed_at=job.get('completed_at'),
        progress=job['progress'],
        error=job.get('error')
    )

@app.get("/results/{job_id}", response_model=JobResultResponse)
async def get_job_results(job_id: str):
    """Get results of a completed job"""
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    job = jobs[job_id]
    
    if job['status'] != 'completed':
        raise HTTPException(
            status_code=400,
            detail=f"Job not completed yet. Status: {job['status']}"
        )
    
    result = job['result']
    return JobResultResponse(
        job_id=job_id,
        status=job['status'],
        npvs=result['npvs'],
        statistics=result['statistics'],
        timing=result['timing'],
        gpu_model=result['gpu_model']
    )

@app.delete("/job/{job_id}")
async def cancel_job(job_id: str):
    """Cancel a job (if not yet completed)"""
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    job = jobs[job_id]
    
    if job['status'] in ['queued', 'running']:
        job['status'] = 'cancelled'
        job['completed_at'] = datetime.utcnow().isoformat()
        return {"message": "Job cancelled", "job_id": job_id}
    else:
        return {"message": f"Job cannot be cancelled (status: {job['status']})", "job_id": job_id}

print("‚úÖ API endpoints defined")

## 4. Start ngrok Tunnel

In [None]:
from pyngrok import ngrok
import nest_asyncio

# Allow nested event loops (required for Colab)
nest_asyncio.apply()

# Optional: Set ngrok auth token for persistent URLs (requires free ngrok account)
# Get token from: https://dashboard.ngrok.com/get-started/your-authtoken
# ngrok.set_auth_token("YOUR_AUTH_TOKEN_HERE")

# Start ngrok tunnel
public_url = ngrok.connect(8000)
print("\n" + "="*80)
print("üöÄ LiveCalc GPU API Server Running")
print("="*80)
print(f"\nüì° Public URL: {public_url}")
print(f"\nüîß Configure VS Code with this URL:")
print(f"   livecalc.colabApiUrl = \"{public_url}\"")
print("\n" + "="*80)
print("\nEndpoints:")
print(f"  GET  {public_url}/              - Root")
print(f"  GET  {public_url}/health        - Health check")
print(f"  POST {public_url}/submit        - Submit job")
print(f"  GET  {public_url}/status/{{id}}  - Job status")
print(f"  GET  {public_url}/results/{{id}} - Job results")
print("\n" + "="*80)

## 5. Start FastAPI Server

In [None]:
import uvicorn
import threading

# Run server in background thread
def run_server():
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()

print("‚úÖ Server started in background thread")
print("\n‚ö†Ô∏è  Keep this notebook running to maintain the API server")
print("‚ö†Ô∏è  Colab free tier may disconnect after 12 hours of inactivity")

## 6. Test the API

In [None]:
import requests
import time

# Get base URL
base_url = str(public_url)

# Test health endpoint
print("Testing /health endpoint...")
response = requests.get(f"{base_url}/health")
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")

# Test job submission with sample data
print("\nTesting job submission...")

sample_job = {
    "policies": [
        {
            "policy_id": 1,
            "age": 30,
            "gender": 0,
            "sum_assured": 100000.0,
            "premium": 500.0,
            "term": 20,
            "product_type": 0,
            "underwriting_class": 0
        }
    ],
    "scenarios": [[0.03] * 50 for _ in range(10)],  # 10 scenarios, 3% constant
    "mortality_table": [[i/1000 for i in range(121)] for _ in range(2)],  # Simple mortality
    "lapse_table": [0.05] * 50,  # 5% constant lapse
    "expenses": {
        "per_policy_acquisition": 100.0,
        "per_policy_maintenance": 10.0,
        "percent_of_premium": 0.05,
        "claim_expense": 50.0
    }
}

response = requests.post(f"{base_url}/submit", json=sample_job)
print(f"Status: {response.status_code}")
job_data = response.json()
print(f"Job ID: {job_data['job_id']}")

# Poll for completion
job_id = job_data['job_id']
print(f"\nPolling job status...")
for i in range(30):
    time.sleep(1)
    response = requests.get(f"{base_url}/status/{job_id}")
    status_data = response.json()
    print(f"  [{i+1}s] Status: {status_data['status']}, Progress: {status_data['progress']:.0%}")
    
    if status_data['status'] == 'completed':
        # Get results
        response = requests.get(f"{base_url}/results/{job_id}")
        results = response.json()
        print(f"\n‚úÖ Job completed!")
        print(f"   Mean NPV: ${results['statistics']['mean']:,.2f}")
        print(f"   Runtime: {results['timing']['total_runtime']:.3f}s")
        print(f"   GPU: {results['gpu_model']}")
        break
    elif status_data['status'] == 'failed':
        print(f"\n‚ùå Job failed: {status_data['error']}")
        break
else:
    print("\n‚ö†Ô∏è  Timeout waiting for job completion")

## 7. Monitor Server (Keep Running)

This cell keeps the notebook alive and displays server status.
Run this cell and leave it running to maintain the API server.

In [None]:
import time
from IPython.display import clear_output

print("\n" + "="*80)
print("üü¢ Server Running - Keep this cell executing")
print("="*80)
print(f"\nPublic URL: {public_url}")
print(f"\nPress Stop button to shutdown server")
print("="*80 + "\n")

try:
    while True:
        # Display current stats
        active_jobs = sum(1 for j in jobs.values() if j['status'] in ['queued', 'running'])
        completed_jobs = sum(1 for j in jobs.values() if j['status'] == 'completed')
        failed_jobs = sum(1 for j in jobs.values() if j['status'] == 'failed')
        
        print(f"\rActive: {active_jobs} | Completed: {completed_jobs} | Failed: {failed_jobs} | Total: {len(jobs)}", end="")
        time.sleep(5)
        
except KeyboardInterrupt:
    print("\n\nüõë Server stopped by user")
    ngrok.disconnect(public_url)
    print("‚úÖ Cleanup complete")