# Nuclio Serverless on Minikube with Hydraa

This notebook demonstrates how to deploy and manage serverless functions on Nuclio using Minikube locally.

## Setup and Initialization

In [4]:
import os
import json
import time
import traceback
from hydraa import proxy, LOCAL, Task, LocalVM
from hydraa.services import ServiceManager
from hydraa_faas.faas_manager.manager import FaasManager

In [6]:
# Create local VM for minikube deployment
# Make sure to run 'minikube start' before executing this cell
vm = LocalVM(launch_type='join', KubeConfigPath='~/.kube/config')

# Initialize proxy and FaaS manager
provider_mgr = proxy([LOCAL])

try:
    faas_mgr = FaasManager(
        proxy_mgr=provider_mgr,  # Fixed parameter name
        vms=[vm],
        asynchronous=False,
        auto_terminate=False  # Keep cluster running for development
    )

    # Start services
    service_mgr = ServiceManager([faas_mgr])
    service_mgr.start_services()
    
    # Wait for initialization
    print("Waiting for Nuclio initialization...")
    time.sleep(30)  # Give time for Nuclio to install

    # Check status
    status = faas_mgr.get_status()
    print(f"FaaS Manager Status: {json.dumps(status, indent=2)}")

except Exception as e:
    print(f"Failed to initialize FaaS manager: {e}")
    traceback.print_exc()

login to: ['local'] succeed
Waiting for Nuclio initialization...
FaaS Manager Status: {
  "manager_id": "faas-mgr-n0tbbnag",
  "initialized": true,
  "active_tasks": 0,
  "total_tasks": 0,
  "providers": {
    "nuclio-local": {
      "active": false,
      "functions_count": 0,
      "deployment_target": "minikube",
      "namespace": "nuclio",
      "registry_configured": false,
      "cluster_status": "ready",
      "queue_size": 0,
      "vm_provider": "local"
    }
  }
}


## 1. Inline Code Deployment

In [7]:
# Simple inline function example
inline_task = Task()
inline_task.provider = 'nuclio'  # Fixed: use 'nuclio' for Nuclio deployment
inline_task.memory = 128
inline_task.vcpus = 0.1

# Add runtime attributes
inline_task.runtime = 'python:3.9'
inline_task.handler = 'main:handler'
inline_task.handler_code = '''
import json

def handler(context, event):
    body = event.body
    if isinstance(body, bytes):
        body = body.decode('utf-8')
    
    data = {}
    if body:
        try:
            data = json.loads(body)
        except json.JSONDecodeError:
            pass
    
    return json.dumps({
        'message': 'Hello from Nuclio!',
        'received': data,
        'timestamp': str(context.event_time)
    })
'''

try:
    # Deploy
    print("Deploying inline function...")
    faas_mgr.submit(inline_task)
    result = inline_task.result()
    print(f"Inline function deployed: {result}")
    
    # Wait a bit for function to be ready
    time.sleep(5)

    # Test - use the nuclio-local provider name
    response = faas_mgr.invoke(inline_task.name, {'test': 'data'}, provider='nuclio-local')
    print(f"Response: {response}")
except Exception as e:
    print(f"Deployment failed: {e}")
    traceback.print_exc()

Deploying inline function...


KeyboardInterrupt: 

## 2. Source Directory Deployment

In [None]:
# First, let's create the directory structure for our functions
import os
os.makedirs('./nuclio_functions/image_classifier', exist_ok=True)
os.makedirs('./nuclio_functions/stream_processor', exist_ok=True)

In [None]:
# Deploy ML inference function from directory
ml_task = Task()
ml_task.provider = 'nuclio'
ml_task.memory = 512
ml_task.vcpus = 0.5

# Add attributes
ml_task.runtime = 'python:3.9'
ml_task.source_path = './nuclio_functions/image_classifier'
ml_task.handler = 'classifier:classify'
ml_task.env_vars = {
    'MODEL_TYPE': 'mobilenet',
    'CONFIDENCE_THRESHOLD': '0.7'
}
ml_task.min_replicas = 1
ml_task.max_replicas = 3

# Add custom build commands for dependencies
ml_task.build_commands = [
    'pip install pillow',
    'pip install requests',
    'pip install numpy'
]

try:
    # Deploy
    print("Deploying ML function...")
    faas_mgr.submit(ml_task)
    result = ml_task.result()
    print(f"ML function deployed: {result}")
    
    time.sleep(5)

    # Test
    test_image = {
        'image_url': 'https://raw.githubusercontent.com/pytorch/hub/master/images/dog.jpg',
        'return_top_k': 5
    }
    response = faas_mgr.invoke(ml_task.name, test_image, provider='nuclio-local')
    print(f"Classification results: {json.dumps(response, indent=2)}")
except Exception as e:
    print(f"ML deployment failed: {e}")
    traceback.print_exc()

In [None]:
# Deploy stream processor from directory
stream_task = Task()
stream_task.provider = 'nuclio'
stream_task.memory = 256
stream_task.vcpus = 0.25

# Add attributes
stream_task.runtime = 'python:3.9'
stream_task.source_path = './nuclio_functions/stream_processor'
stream_task.handler = 'processor:process_stream'
stream_task.build_commands = ['pip install uuid']

try:
    # Deploy
    print("Deploying stream processor...")
    faas_mgr.submit(stream_task)
    result = stream_task.result()
    print(f"Stream processor deployed: {result}")
    
    time.sleep(5)

    # Test
    test_stream = {
        'events': [
            {'id': 1, 'type': 'click', 'timestamp': '2024-01-01T10:00:00Z'},
            {'id': 2, 'type': 'view', 'timestamp': '2024-01-01T10:01:00Z'},
            {'id': 3, 'type': 'purchase', 'amount': 99.99, 'timestamp': '2024-01-01T10:02:00Z'}
        ]
    }
    response = faas_mgr.invoke(stream_task.name, test_stream, provider='nuclio-local')
    print(f"Stream processing result: {json.dumps(response, indent=2)}")
except Exception as e:
    print(f"Stream processor deployment failed: {e}")
    traceback.print_exc()

## 3. Container Image Deployment

In [None]:
# Deploy from pre-built container image
container_task = Task()
container_task.provider = 'nuclio'
container_task.memory = 1024
container_task.vcpus = 1
container_task.image = 'python:3.9-slim'  # Use a real image

# Add attributes
container_task.handler = 'main:handler'
container_task.runtime = 'python:3.9'
container_task.handler_code = '''
import json
import sys

def handler(context, event):
    return json.dumps({
        'message': 'Hello from container!',
        'python_version': sys.version
    })
'''

# Uncomment to deploy
# try:
#     print("Deploying container function...")
#     faas_mgr.submit(container_task)
#     result = container_task.result()
#     print(f"Container function deployed: {result}")
# except Exception as e:
#     print(f"Container deployment failed: {e}")
#     traceback.print_exc()

## Advanced Features

In [None]:
# Auto-scaling configuration
scaling_task = Task()
scaling_task.provider = 'nuclio'
scaling_task.memory = 256
scaling_task.vcpus = 0.25

# Add attributes
scaling_task.runtime = 'python:3.9'
scaling_task.handler = 'handler:process'
scaling_task.handler_code = '''
import json
import time

def process(context, event):
    # Simulate some processing
    time.sleep(0.1)
    return json.dumps({
        'processed': True,
        'instance': context.worker_id
    })
'''
scaling_task.min_replicas = 0  # Scale to zero
scaling_task.max_replicas = 5

try:
    # Deploy with auto-scaling
    print("Deploying auto-scaling function...")
    faas_mgr.submit(scaling_task)
    result = scaling_task.result()
    print(f"Auto-scaling function deployed: {result}")
except Exception as e:
    print(f"Auto-scaling function deployment failed: {e}")
    traceback.print_exc()

In [None]:
# Batch deployment
workers = []
for i in range(3):
    task = Task()
    task.provider = 'nuclio'
    task.memory = 256
    task.vcpus = 0.25
    
    # Add attributes
    task.runtime = 'python:3.9'
    task.handler = 'worker:process'
    task.handler_code = f'''
import json

def process(context, event):
    return json.dumps({{
        'worker_id': {i},
        'message': 'Processing complete'
    }})
'''
    task.env_vars = {'WORKER_ID': str(i)}
    workers.append(task)

try:
    # Deploy workers
    print("Deploying batch of workers...")
    faas_mgr.submit(workers)
    for i, task in enumerate(workers):
        result = task.result()
        print(f"Worker {i} deployed: {task.name}")
except Exception as e:
    print(f"Batch deployment failed: {e}")
    traceback.print_exc()

In [None]:
# List and manage functions
try:
    functions = faas_mgr.list_functions()
    print("Deployed functions:")
    for provider, funcs in functions.items():
        print(f"\nProvider: {provider}")
        for func in funcs:
            print(f"  - {func['name']} (state: {func.get('state', 'unknown')}, replicas: {func.get('replicas', 0)})")

    # Get manager status
    status = faas_mgr.get_status()
    print(f"\nManager status: {json.dumps(status, indent=2)}")
    
    # Get provider health
    health = faas_mgr.get_provider_health()
    print(f"\nProvider health: {json.dumps(health, indent=2)}")
except Exception as e:
    print(f"Failed to list functions: {e}")
    traceback.print_exc()

## Cleanup

In [None]:
# Delete specific function
# try:
#     if 'inline_task' in locals() and hasattr(inline_task, 'name'):
#         faas_mgr.delete_function(inline_task.name, provider='nuclio')
#         print(f"Deleted function: {inline_task.name}")
# except Exception as e:
#     print(f"Failed to delete function: {e}")

# Shutdown manager (keeps minikube running if auto_terminate=False)
# try:
#     faas_mgr.shutdown()
#     print("FaaS manager shutdown complete")
# except Exception as e:
#     print(f"Failed to shutdown manager: {e}")

# To fully cleanup minikube cluster, run:
# !minikube delete