# FUSED Framework: Model Serving Example

This notebook demonstrates how to export and serve models created with the FUSED framework for deployment in production environments.

## Setup

First, let's import the necessary libraries:

In [None]:
import torch
import torch.nn as nn
import numpy as np
import json
import os
import requests
from torch.utils.data import TensorDataset, DataLoader

# Import FUSED utilities
from fused.utils.serving import ModelExporter, ModelServer, load_model
from fused.models import SequentialEncoder

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

## Create a Simple Model

Let's create a simple FUSED model for time series classification that we'll export:

In [None]:
class TimeSeriesClassifier(nn.Module):
    """A simple time series classifier model."""
    
    def __init__(self, input_dim=5, hidden_dim=32, num_classes=2):
        super().__init__()
        
        self.config = {
            "input_dim": input_dim,
            "hidden_dim": hidden_dim,
            "num_classes": num_classes
        }
        
        self.encoder = SequentialEncoder(
            input_dim=input_dim,
            hidden_dim=hidden_dim,
            output_dim=hidden_dim,
            bidirectional=True,
            encoder_type="lstm"
        )
        
        self.classifier = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x):
        # Handle both tensor and dictionary inputs
        if isinstance(x, dict):
            if "features" in x:
                x = x["features"]
            else:
                x = next(iter(x.values()))
                
        # Encode sequence
        encoded = self.encoder(x)
        
        # Classify
        logits = self.classifier(encoded)
        
        # Return dictionary output
        return {"logits": logits, "embeddings": encoded}
    
    def predict(self, x):
        """Convenience method for getting predictions."""
        self.eval()
        with torch.no_grad():
            outputs = self(x)
            probs = torch.softmax(outputs["logits"], dim=1)
            preds = torch.argmax(probs, dim=1)
        return {"class": preds, "probabilities": probs}

## Generate Synthetic Data

Let's create some synthetic time series data for our example:

In [None]:
def generate_synthetic_data(n_samples=100, seq_length=20, input_dim=5):
    """Generate synthetic time series data."""
    # Generate random sequences
    X = torch.randn(n_samples, seq_length, input_dim)
    
    # Add a pattern where the first feature determines the class
    pattern = torch.sin(torch.linspace(0, 4*np.pi, seq_length)).unsqueeze(0).unsqueeze(-1)
    
    # Class 0: sinusoidal pattern
    # Class 1: negative sinusoidal pattern
    labels = torch.randint(0, 2, (n_samples,))
    for i, label in enumerate(labels):
        if label == 0:
            X[i, :, 0:1] += pattern
        else:
            X[i, :, 0:1] -= pattern
    
    return X, labels

# Generate data
X, y = generate_synthetic_data(n_samples=100, seq_length=20, input_dim=5)
print(f"X shape: {X.shape}, y shape: {y.shape}")

# Split into train and test sets
train_size = 80
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# Create datasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)

## Train the Model

Let's quickly train our model on the synthetic data:

In [None]:
# Create model
model = TimeSeriesClassifier(input_dim=X.shape[2], hidden_dim=32, num_classes=2)

# Setup training
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in train_loader:
            # Forward pass
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs["logits"], labels)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            # Track metrics
            epoch_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs["logits"], 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        # Print epoch metrics
        epoch_loss /= len(train_loader.dataset)
        accuracy = correct / total
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f} - Accuracy: {accuracy:.4f}")
    
    return model

# Train the model
model = train(model, train_loader, criterion, optimizer, num_epochs=5)

# Evaluate on test set
def evaluate(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs["logits"], 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")
    return accuracy

# Evaluate the model
test_accuracy = evaluate(model, test_loader)

## Export the Model

Now, let's use the `ModelExporter` to save our model in different formats:

In [None]:
# Create export directory
export_dir = "./exported_models"
os.makedirs(export_dir, exist_ok=True)

# Create model exporter
exporter = ModelExporter(model, save_dir=export_dir)

# Export in PyTorch format
pt_path = exporter.export_pytorch(filename="time_series_classifier.pt")
print(f"PyTorch model exported to: {pt_path}")

# Export model configuration
config_path = exporter.export_config(filename="model_config.json")
print(f"Model configuration exported to: {config_path}")

# Example inputs for TorchScript export
example_batch = next(iter(test_loader))[0]

# Export in TorchScript format
try:
    script_path = exporter.export_torchscript(
        filename="time_series_classifier_script.pt",
        example_inputs=example_batch,
        method="trace"
    )
    print(f"TorchScript model exported to: {script_path}")
except Exception as e:
    print(f"TorchScript export failed: {e}")

# Export in ONNX format (if available)
try:
    import onnx
    import onnxruntime
    
    onnx_path = exporter.export_onnx(
        filename="time_series_classifier.onnx",
        example_inputs=example_batch,
        input_names=["input"],
        output_names=["logits", "embeddings"]
    )
    print(f"ONNX model exported to: {onnx_path}")
except (ImportError, Exception) as e:
    print(f"ONNX export skipped: {e}")

## Load and Test the Exported Model

Let's load the exported model and verify that it works correctly:

In [None]:
# Load the model
loaded_model = load_model(pt_path)

# Verify the loaded model
print(f"Loaded model: {type(loaded_model)}")

# Compare predictions
example_input = X_test[:5]  # Take a few test samples

# Original model prediction
model.eval()
with torch.no_grad():
    original_output = model(example_input)
    original_preds = torch.argmax(original_output["logits"], dim=1)

# Loaded model prediction
loaded_model.eval()
with torch.no_grad():
    loaded_output = loaded_model(example_input)
    loaded_preds = torch.argmax(loaded_output["logits"], dim=1)

# Compare predictions
print(f"Original model predictions: {original_preds}")
print(f"Loaded model predictions: {loaded_preds}")
print(f"Predictions match: {torch.all(original_preds == loaded_preds).item()}")

## Create a Model Server

Now, let's create a server to serve our model via HTTP:

In [None]:
# Create a model server
server = ModelServer(model_path=pt_path, device="cpu")

# Define preprocessing and postprocessing functions
def preprocess(data):
    """Convert input data to tensor format."""
    if isinstance(data, dict) and "data" in data:
        # Convert nested arrays to tensors
        if isinstance(data["data"], list):
            return torch.tensor(data["data"], dtype=torch.float32)
        return data["data"]
    return data

def postprocess(output):
    """Convert model output to JSON-serializable format."""
    result = {}
    for k, v in output.items():
        if isinstance(v, torch.Tensor):
            result[k] = v.cpu().numpy().tolist()
        else:
            result[k] = v
    return result

# Configure server
server.set_preprocessing(preprocess)
server.set_postprocessing(postprocess)

# Start server in a separate thread
import threading
import time

def run_server():
    server.start_http_server(host="0.0.0.0", port=8000)

# Start server thread
server_thread = threading.Thread(target=run_server)
server_thread.daemon = True  # This ensures the thread will be terminated when the notebook is closed
server_thread.start()

# Wait for server to start
time.sleep(2)
print("Server started at http://localhost:8000")

## Test the Server API

Let's test our server by sending requests to it:

In [None]:
# Prepare example data
example_data = X_test[0:1].numpy().tolist()  # Convert to list for JSON serialization

# Send prediction request
response = requests.post(
    "http://localhost:8000/predict",
    json={"data": example_data}
)

# Check response
if response.status_code == 200:
    result = response.json()
    print("Server response:")
    print(json.dumps(result, indent=2))
    
    # Verify prediction
    with torch.no_grad():
        direct_pred = model(torch.tensor(example_data, dtype=torch.float32))
        direct_class = torch.argmax(direct_pred["logits"], dim=1).item()
    
    server_logits = result["logits"][0]
    server_class = server_logits.index(max(server_logits))
    
    print(f"\nDirect model prediction: Class {direct_class}")
    print(f"Server prediction: Class {server_class}")
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## Batch Prediction API

Let's also test the batch prediction API:

In [None]:
# Prepare batch data
batch_data = X_test[0:5].numpy().tolist()  # Convert to list for JSON serialization

# Send batch prediction request
response = requests.post(
    "http://localhost:8000/predict_batch",
    json={"data": batch_data}
)

# Check response
if response.status_code == 200:
    result = response.json()
    print("Server batch response (truncated):")
    print(json.dumps({
        "logits": result["logits"][0:2],  # Show just first two predictions
        "embeddings": "..."
    }, indent=2))
    
    # Verify batch prediction
    with torch.no_grad():
        direct_pred = model(torch.tensor(batch_data, dtype=torch.float32))
        direct_classes = torch.argmax(direct_pred["logits"], dim=1).tolist()
    
    server_classes = [logits.index(max(logits)) for logits in result["logits"]]
    
    print(f"\nDirect model batch predictions: {direct_classes}")
    print(f"Server batch predictions: {server_classes}")
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## Embedding API

Let's test the embedding extraction API as well:

In [None]:
# Send embedding extraction request
response = requests.post(
    "http://localhost:8000/embed",
    json={"data": example_data}
)

# Check response
if response.status_code == 200:
    result = response.json()
    print("Embedding response:")
    print(f"Embedding shape: {len(result['embeddings'][0])} dimensions")
    print(f"First few dimensions: {result['embeddings'][0][:5]}")
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## Model Metadata API

Finally, let's check the model metadata API:

In [None]:
# Get model metadata
response = requests.get("http://localhost:8000/metadata")

# Check response
if response.status_code == 200:
    metadata = response.json()
    print("Model metadata:")
    print(json.dumps(metadata, indent=2))
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## Shutdown Server

Now let's shut down the server:

In [None]:
# Shutdown the server
requests.post("http://localhost:8000/shutdown")
print("Server shutdown requested")

## Deployment Options

There are several ways to deploy FUSED models in production environments:

1. **HTTP Server**: As demonstrated above, using Flask-based HTTP server for REST API access
2. **Docker Container**: Package the model and server in a Docker container for easy deployment
3. **Cloud Platforms**: Deploy to AWS SageMaker, Azure ML, or Google AI Platform
4. **TorchServe**: Use TorchServe for more advanced serving capabilities
5. **ONNX Runtime**: Deploy ONNX models with ONNX Runtime for cross-platform inference

Below is an example of how to create a Dockerfile for deploying our model server:

In [None]:
%%writefile Dockerfile
FROM python:3.8-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy model and server code
COPY exported_models /app/exported_models
COPY server.py .

# Expose port
EXPOSE 8000

# Run server
CMD ["python", "server.py"]

In [None]:
%%writefile server.py
import torch
from fused.utils.serving import ModelServer, load_model
import json
import os

def preprocess(data):
    """Convert input data to tensor format."""
    if isinstance(data, dict) and "data" in data:
        # Convert nested arrays to tensors
        if isinstance(data["data"], list):
            return torch.tensor(data["data"], dtype=torch.float32)
        return data["data"]
    return data

def postprocess(output):
    """Convert model output to JSON-serializable format."""
    result = {}
    for k, v in output.items():
        if isinstance(v, torch.Tensor):
            result[k] = v.cpu().numpy().tolist()
        else:
            result[k] = v
    return result

if __name__ == "__main__":
    # Load the model
    model_path = os.path.join("exported_models", "time_series_classifier.pt")
    
    # Create server
    server = ModelServer(model_path=model_path, device="cpu")
    
    # Configure server
    server.set_preprocessing(preprocess)
    server.set_postprocessing(postprocess)
    
    # Start server
    server.start_http_server(host="0.0.0.0", port=8000)

In [None]:
%%writefile requirements.txt
torch>=1.10.0
fused
flask>=2.0.0
numpy>=1.20.0
requests>=2.25.0

## Conclusion

In this notebook, we've demonstrated how to use the model serving utilities in the FUSED framework to export and deploy models for production use. We've covered:

1. Training a simple time series classification model
2. Exporting the model in various formats (PyTorch, TorchScript, ONNX)
3. Creating a model server with HTTP API endpoints
4. Testing the server with prediction, batch prediction, and embedding API calls
5. Preparing for containerized deployment with Docker

The `ModelExporter` and `ModelServer` classes make it easy to productionize FUSED models, allowing you to move from research to deployment with minimal effort.