# Fluster Demo Notebook

Interactive exploration of the fluster cluster system.

This notebook connects to a running fluster cluster and demonstrates how to:
- Submit jobs to the cluster
- Wait for job completion
- Submit jobs with arguments
- View cluster state
- Run stateful actors and call methods on them
- Use WorkerPool for parallel task execution

## Setup and Connection

Connect to the demo cluster using environment variables set by `demo_cluster.py`.

In [None]:
import os
from pathlib import Path

from fluster.client.rpc_client import RpcClusterClient
from fluster.cluster.types import Entrypoint
from fluster.rpc import cluster_pb2

# Connect to the demo cluster
# FLUSTER_CONTROLLER_ADDRESS and FLUSTER_WORKSPACE are set by demo_cluster.py
controller_address = os.environ.get("FLUSTER_CONTROLLER_ADDRESS", "http://127.0.0.1:8080")
workspace_str = os.environ.get("FLUSTER_WORKSPACE")
if workspace_str is None:
    raise RuntimeError(
        "FLUSTER_WORKSPACE not set. Run this notebook via: "
        "uv run python examples/demo_cluster.py"
    )
workspace = Path(workspace_str)

client = RpcClusterClient(controller_address, workspace=workspace)
print(f"Connected to cluster at {controller_address}")

## Submit a Simple Job

Submit a basic job that prints a message and returns a value.

In [None]:
def hello_world():
    print("Hello from the cluster!")
    return 42

job_id = client.submit(
    entrypoint=Entrypoint.from_callable(hello_world),
    name="notebook-hello",
    resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
)
print(f"Submitted job: {job_id}")

## Wait and Check Status

Wait for the job to complete and check its status.

In [None]:
status = client.wait(job_id, timeout=30.0)
print(f"Job {job_id}: {cluster_pb2.JobState.Name(status.state)}")

## Submit a Job with Arguments

Submit a job that takes arguments and performs a computation.

In [None]:
def compute(a: int, b: int) -> int:
    result = a * b
    print(f"Computing {a} * {b} = {result}")
    return result

job_id = client.submit(
    entrypoint=Entrypoint.from_callable(compute, 7, 6),
    name="multiply-job",
    resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
)
status = client.wait(job_id)
print(f"Result: {cluster_pb2.JobState.Name(status.state)}")

## Submit Multiple Jobs

Submit multiple jobs and wait for all of them to complete.

In [None]:
def square(n: int) -> int:
    result = n * n
    print(f"{n}^2 = {result}")
    return result

# Submit 5 jobs
job_ids = []
for i in range(1, 6):
    job_id = client.submit(
        entrypoint=Entrypoint.from_callable(square, i),
        name=f"square-{i}",
        resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
    )
    job_ids.append(job_id)
    print(f"Submitted: {job_id}")

# Wait for all
print("\nWaiting for jobs...")
for job_id in job_ids:
    status = client.wait(job_id)
    print(f"{job_id}: {cluster_pb2.JobState.Name(status.state)}")

## View Job Status

Check the status of a job without waiting for it to complete.

In [None]:
import time

def slow_job():
    for i in range(5):
        print(f"Working... {i+1}/5")
        time.sleep(1)
    return "done"

job_id = client.submit(
    entrypoint=Entrypoint.from_callable(slow_job),
    name="slow-job",
    resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
)
print(f"Submitted: {job_id}")

# Check status a few times while it runs
for _ in range(3):
    time.sleep(1)
    status = client.status(job_id)
    print(f"Status: {cluster_pb2.JobState.Name(status.state)}")

# Wait for completion
status = client.wait(job_id)
print(f"Final: {cluster_pb2.JobState.Name(status.state)}")

## Remote Actor Demo

Demonstrate running a stateful actor as a remote job. The actor maintains state
across method calls, enabling persistent services within the cluster.

This example:
1. Submits a job that starts an ActorServer with a Counter actor
2. Discovers the actor endpoint via the controller
3. Calls methods and verifies state is maintained across calls

In [None]:
# Define a stateful Counter actor
class Counter:
    """A simple stateful actor that maintains a count."""

    def __init__(self):
        self._count = 0

    def increment(self, amount: int = 1) -> int:
        """Increment the counter and return the new value."""
        self._count += amount
        return self._count

    def get_count(self) -> int:
        """Return the current count."""
        return self._count

    def reset(self) -> int:
        """Reset the counter to zero and return the old value."""
        old = self._count
        self._count = 0
        return old

print("Counter actor class defined")

In [None]:
import time

from fluster.actor import ActorServer
from fluster.client import fluster_ctx


def run_counter_actor():
    """Job entrypoint that starts a Counter actor server.

    The actor server:
    1. Binds to an allocated port (from FLUSTER_PORT_ACTOR)
    2. Registers its endpoint with the controller for discovery
    3. Serves requests until the job is terminated
    """
    import os
    ctx = fluster_ctx()
    print(f"Starting counter actor for job {ctx.job_id}")

    # Create and register the actor
    server = ActorServer(host="127.0.0.1")
    server.register("counter", Counter())

    # Start the server on the allocated port
    port = server.serve_background()
    print(f"Actor server started on port {port}")

    # Register endpoint with controller for discovery
    # Use controller address from environment, fixing host.docker.internal for in-process mode
    controller_addr = os.environ.get("FLUSTER_CONTROLLER_ADDRESS", "")
    if "host.docker.internal" in controller_addr:
        controller_addr = controller_addr.replace("host.docker.internal", "127.0.0.1")

    address = f"127.0.0.1:{port}"

    if controller_addr:
        from fluster.rpc.cluster_connect import ControllerServiceClientSync
        from fluster.rpc import cluster_pb2 as pb2

        ctrl_client = ControllerServiceClientSync(address=controller_addr, timeout_ms=5000)
        prefixed_name = f"{ctx.namespace}/counter"
        request = pb2.Controller.RegisterEndpointRequest(
            name=prefixed_name,
            address=address,
            job_id=ctx.job_id,
            metadata={},
        )
        response = ctrl_client.register_endpoint(request)
        print(f"Registered endpoint: {prefixed_name} -> {address} (id={response.endpoint_id})")
    else:
        print("WARNING: No controller address, endpoint not registered")

    # Keep running to serve requests
    # The job will be terminated externally when no longer needed
    print("Actor ready, serving requests...")
    while True:
        time.sleep(1)


print("Actor job function defined")

In [None]:
# Submit the actor job
# Request ports=["actor"] so the worker allocates a port for the actor server
actor_job_id = client.submit(
    entrypoint=Entrypoint.from_callable(run_counter_actor),
    name="counter-actor",
    resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
    ports=["actor"],
)
print(f"Submitted actor job: {actor_job_id}")

In [None]:
from fluster.rpc.cluster_connect import ControllerServiceClientSync
from fluster.time_utils import wait_until_with_exception

# Wait for the actor job to start running and register its endpoint
# Unlike regular jobs, we don't wait for completion - we wait for RUNNING state
print("Waiting for actor to start...")

controller_client = ControllerServiceClientSync(
    address=controller_address,
    timeout_ms=30000,
)

_job_status = None

def job_is_running_or_failed() -> bool:
    global _job_status
    _job_status = client.status(actor_job_id)
    return _job_status.state in (
        cluster_pb2.JOB_STATE_RUNNING,
        cluster_pb2.JOB_STATE_FAILED,
        cluster_pb2.JOB_STATE_KILLED,
    )

wait_until_with_exception(
    job_is_running_or_failed,
    timeout=15.0,
    error_message="Actor job did not start in time",
    initial_interval=0.1,
    max_interval=1.0,
)

if _job_status.state != cluster_pb2.JOB_STATE_RUNNING:
    raise RuntimeError(f"Actor job failed: {cluster_pb2.JobState.Name(_job_status.state)}")

print("Actor job is running")

In [None]:
from fluster.actor import ActorClient, FixedResolver

# Discover the actor endpoint via the controller
# The endpoint name is prefixed with the namespace (root job ID)
endpoint_prefix = f"{actor_job_id}/counter"
print(f"Discovering endpoint with prefix: {endpoint_prefix}")

_endpoint = None

def endpoint_discovered() -> bool:
    global _endpoint
    request = cluster_pb2.Controller.ListEndpointsRequest(prefix=endpoint_prefix)
    response = controller_client.list_endpoints(request)
    if response.endpoints:
        _endpoint = response.endpoints[0]
        return True
    return False

wait_until_with_exception(
    endpoint_discovered,
    timeout=15.0,
    error_message=f"No endpoints found for prefix: {endpoint_prefix}",
    initial_interval=0.1,
    max_interval=1.0,
)

actor_url = f"http://{_endpoint.address}"
print(f"Discovered actor endpoint: {_endpoint.name} -> {actor_url}")

# Create actor client with FixedResolver (we know the exact address)
resolver = FixedResolver({"counter": actor_url})
counter = ActorClient(resolver, "counter", timeout=10.0)
print("Actor client created")

In [None]:
# Test the actor: verify state is maintained across calls
print("Testing actor state persistence...")

# Initial count should be 0
initial = counter.get_count()
print(f"Initial count: {initial}")
assert initial == 0, f"Expected 0, got {initial}"

# Increment 3 times
for i in range(1, 4):
    result = counter.increment()
    print(f"After increment {i}: {result}")
    assert result == i, f"Expected {i}, got {result}"

# Verify final count
final = counter.get_count()
print(f"Final count: {final}")
assert final == 3, f"Expected 3, got {final}"

# Test increment with custom amount
result = counter.increment(10)
print(f"After increment(10): {result}")
assert result == 13, f"Expected 13, got {result}"

# Test reset
old_value = counter.reset()
print(f"Reset returned: {old_value}")
assert old_value == 13, f"Expected 13, got {old_value}"

# Verify count is now 0
after_reset = counter.get_count()
print(f"After reset: {after_reset}")
assert after_reset == 0, f"Expected 0, got {after_reset}"

print("\nAll actor tests passed! State is correctly maintained across calls.")

In [None]:
# Cleanup: terminate the actor job
print(f"Terminating actor job: {actor_job_id}")
client.terminate(actor_job_id)

# Wait for termination to complete
status = client.wait(actor_job_id, timeout=10.0)
print(f"Actor job terminated: {cluster_pb2.JobState.Name(status.state)}")

## WorkerPool Demo

WorkerPool provides a high-level interface for parallel task execution. Unlike
submitting individual jobs (which have scheduling overhead), WorkerPool maintains
a persistent pool of workers that can execute arbitrary callables with minimal latency.

Key features:
- **Persistent workers**: Workers stay running and accept tasks via RPC
- **Task queuing**: Submit many tasks; they queue and dispatch to idle workers
- **map() interface**: Familiar parallel map semantics for batch processing

WorkerPool must run from within a job context (it needs FlusterContext for
endpoint discovery). This demo submits a "coordinator" job that creates and
uses a WorkerPool internally.

In [None]:
def workerpool_coordinator():
    """Coordinator job that demonstrates WorkerPool usage.
    
    This runs inside a job context, which provides the FlusterContext needed
    for WorkerPool's endpoint discovery mechanism.
    """
    from fluster.client import fluster_ctx
    from fluster.worker_pool import WorkerPool, WorkerPoolConfig
    from fluster.rpc import cluster_pb2

    ctx = fluster_ctx()
    print(f"Coordinator starting (job_id={ctx.job_id})")

    # Define a simple computation function
    def square(n: int) -> int:
        return n * n

    # Create pool configuration: 3 workers with minimal resources
    config = WorkerPoolConfig(
        num_workers=3,
        resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
        name_prefix="pool-worker",
    )

    print(f"Creating WorkerPool with {config.num_workers} workers...")

    # Use WorkerPool as a context manager for automatic cleanup
    with WorkerPool(ctx.controller, config, timeout=30.0) as pool:
        print(f"Pool ready: {pool.size} workers available")
        pool.print_status()

        # Use map() to compute squares of 1-10 in parallel
        items = list(range(1, 11))
        print(f"\nComputing squares of {items}...")

        futures = pool.map(square, items)
        results = [f.result(timeout=30.0) for f in futures]

        print(f"Results: {results}")
        
        # Verify results
        expected = [i * i for i in items]
        assert results == expected, f"Expected {expected}, got {results}"
        
        print("\nFinal pool status:")
        pool.print_status()

    print("\nWorkerPool demo completed successfully!")


print("Coordinator function defined")

In [None]:
# Submit the coordinator job
coordinator_job_id = client.submit(
    entrypoint=Entrypoint.from_callable(workerpool_coordinator),
    name="workerpool-demo",
    resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
)
print(f"Submitted coordinator job: {coordinator_job_id}")

In [None]:
# Wait for the coordinator to complete
# This may take a minute as it launches worker sub-jobs and waits for them
print("Waiting for WorkerPool demo to complete...")
print("(The coordinator will launch 3 worker sub-jobs internally)")
print()

status = client.wait(coordinator_job_id, timeout=120.0)
state_name = cluster_pb2.JobState.Name(status.state)
print(f"Coordinator job finished: {state_name}")

if status.state != cluster_pb2.JOB_STATE_SUCCEEDED:
    print(f"WARNING: Job did not succeed (state={state_name})")
else:
    print("\nWorkerPool demo completed successfully!")