Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions docs/tutorials/runner-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ class Counter:
return self._count

with Runner("counter-app") as runner:
counter = runner.service(Counter(10), stateful=True, warmup=1)
counter = runner.service(Counter(10), warmup=1)
counter.add(1).wait()
counter.add(3).wait()
print(counter.get().get())
```

Classes and instances default to fixed sessions. Passing `warmup=N` with `autoscale=False` creates `N` fixed instances. Use `stateful=True` only with instances, not classes.
Functions, builtins, and classes are stateless. Object instances are stateful fixed services and support only `warmup=0` or `warmup=1`.

### Passing ObjectFuture Values

Expand Down Expand Up @@ -144,7 +144,7 @@ Runner(name: str, fail_if_exists: bool = False)

Methods:

- `service(execution_object, stateful=None, autoscale=None, warmup=0, resreq=None)`: create a `RunnerService`.
- `service(execution_object, autoscale=None, warmup=0, resreq=None)`: create a `RunnerService`.
- `get(futures)`: resolve multiple `ObjectFuture` values to concrete objects.
- `ref(futures)`: resolve multiple `ObjectFuture` values to `ObjectRef` values.
- `wait(futures)`: wait for multiple futures without fetching objects.
Expand Down Expand Up @@ -173,13 +173,15 @@ with Runner("cpu-app") as runner:
- Every remote call returns `ObjectFuture`.
- `close()` closes the underlying Flame session.

Default scaling behavior:
Default service behavior with `warmup=0`:

| Execution object | Default `autoscale` | Default `stateful` |
|------------------|---------------------|--------------------|
| Function | `True` | `False` |
| Class | `False` | `False` |
| Instance | `False` | `False` |
| Execution object | Default `stateful` | Default `autoscale` | Effective `min_instances` | Effective `max_instances` |
|------------------|--------------------|---------------------|---------------------------|---------------------------|
| Function or builtin | `False` | `True` | `0` | unlimited |
| Class | `False` | `True` | `0` | unlimited |
| Instance | `True` | `False` | `1` | `1` |

For functions, builtins, and classes, `autoscale` is configurable. When `warmup=N` and `N > 0`, autoscaled services use `min_instances=N` and no max limit; fixed services use `min_instances=N` and `max_instances=N`. Object instances are always fixed, always stateful, and reject `warmup` values other than `0` or `1`.

### ObjectFuture

Expand Down
25 changes: 16 additions & 9 deletions e2e/src/e2e/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,20 @@ class RecursiveService:
using the open_session API.
"""

def __init__(self, session_id: str, app_name: str):
_session_context: Optional[SessionContext] = None

def __init__(self, session_id: Optional[str] = None, app_name: Optional[str] = None):
"""Initialize with session ID and app name for recursive calls.

Args:
session_id: The shared session ID for recursive calls.
app_name: The shared application name for Runner.
"""
self._session_context = SessionContext(
session_id=session_id,
application_name=app_name,
)
if session_id is not None and app_name is not None:
self._session_context = SessionContext(
session_id=session_id,
application_name=app_name,
)

def compute_recursive(self, depth: int) -> int:
"""Compute recursively by creating new Runner and service instances.
Expand All @@ -312,6 +315,9 @@ def compute_recursive(self, depth: int) -> int:
logger = logging.getLogger(__name__)

logger.info(f"[RecursiveService] compute_recursive called with depth={depth}")
if self._session_context is None:
raise ValueError("RecursiveService requires _session_context")

logger.info(f"[RecursiveService] session_context: session_id={self._session_context.session_id}, app_name={self._session_context.application_name}")

if depth <= 0:
Expand All @@ -326,11 +332,12 @@ def compute_recursive(self, depth: int) -> int:
with Runner(self._session_context.application_name) as inner_runner:
logger.info(f"[RecursiveService] Inner Runner created, _app_registered={inner_runner._app_registered}")

# Create service using Runner.service() with self
# Create service using Runner.service() with the class
# This reuses the existing session via _session_context
# Use autoscale=True to allow multiple executors for recursive calls
logger.info("[RecursiveService] Creating inner service with self")
inner_service = inner_runner.service(self, autoscale=True)
# Use a class service with autoscale=True to allow multiple
# executors for recursive calls.
logger.info("[RecursiveService] Creating inner service with class")
inner_service = inner_runner.service(type(self), autoscale=True)
logger.info(f"[RecursiveService] Inner service created, session_id={inner_service._session.id}")

# Call the inner service recursively
Expand Down
81 changes: 41 additions & 40 deletions e2e/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def test_runner_with_instance(check_package_config, check_flmrun_app):
# Set initial count to 10 by adding 10
counter.add(10)

# Create a stateful service with the instance
cnt_os = rr.service(counter, stateful=True, autoscale=False)
# Instance services are stateful and fixed by default.
cnt_os = rr.service(counter)

# Apply state changes sequentially so the expected total is deterministic.
cnt_os.increment().wait()
Expand All @@ -124,8 +124,8 @@ def test_runner_with_objectfuture_args(check_package_config, check_flmrun_app):
counter = Counter()
counter.add(10)

# Create a stateful service with the instance
cnt_os = rr.service(counter, stateful=True, autoscale=False)
# Instance services are stateful and fixed by default.
cnt_os = rr.service(counter)

# Apply state changes sequentially so ObjectFuture chaining starts from
# a deterministic counter value.
Expand Down Expand Up @@ -284,13 +284,13 @@ def test_runner_error_no_storage_config():


def test_runner_stateful_instance(check_package_config, check_flmrun_app):
"""Test Case 14: Test Runner with stateful=True for instance."""
"""Test Case 14: Test Runner with default stateful instance."""
with runner.Runner("test-runner-stateful") as rr:
# Create a Counter instance
counter = Counter()

# Create a stateful service (state should persist across tasks)
cnt_service = rr.service(counter, stateful=True, autoscale=False)
# Instance services are stateful by default.
cnt_service = rr.service(counter)

# Call methods
cnt_service.add(5).wait()
Expand All @@ -306,7 +306,7 @@ def test_runner_stateless_function(check_package_config, check_flmrun_app):
"""Test Case 15: Test Runner with stateless function (default behavior)."""
with runner.Runner("test-runner-stateless-func") as rr:
# Create a service with a function (stateless by default)
sum_service = rr.service(sum_func, stateful=False, autoscale=True)
sum_service = rr.service(sum_func)

# Call the function multiple times
results = [sum_service(i, i + 1) for i in range(5)]
Expand All @@ -321,7 +321,7 @@ def test_runner_class_single_instance(check_package_config, check_flmrun_app):
"""Test Case 16: Test Runner with class and autoscale=False (single instance)."""
with runner.Runner("test-runner-class-single") as rr:
# Create a service with a class, single instance mode
calc_service = rr.service(Calculator, stateful=False, autoscale=False)
calc_service = rr.service(Calculator, autoscale=False)

# Call methods
result1 = calc_service.add(10, 5)
Expand All @@ -331,14 +331,14 @@ def test_runner_class_single_instance(check_package_config, check_flmrun_app):
assert values == [15, 12], f"Expected [15, 12], got {values}"


def test_runner_error_stateful_class(check_package_config, check_flmrun_app):
"""Test Case 17: Test that stateful=True raises error for class."""
with runner.Runner("test-runner-stateful-class-error") as rr:
# Trying to create a stateful service with a class should raise ValueError
def test_runner_error_object_autoscale(check_package_config, check_flmrun_app):
"""Test Case 17: Test that object instances cannot autoscale."""
with runner.Runner("test-runner-object-autoscale-error") as rr:
# Object instance services are always fixed.
with pytest.raises(ValueError) as exc_info:
rr.service(Counter, stateful=True)
rr.service(Counter(), autoscale=True)

assert "Cannot set stateful=True for a class" in str(exc_info.value)
assert "always fixed" in str(exc_info.value)


def test_runner_defaults_function(check_package_config, check_flmrun_app):
Expand All @@ -354,9 +354,9 @@ def test_runner_defaults_function(check_package_config, check_flmrun_app):


def test_runner_defaults_class(check_package_config, check_flmrun_app):
"""Test Case 19: Test default parameters for class (stateful=False, autoscale=False)."""
"""Test Case 19: Test default parameters for class (stateful=False, autoscale=True)."""
with runner.Runner("test-runner-defaults-class") as rr:
# Create service with class using defaults (should be stateful=False, autoscale=False)
# Create service with class using defaults (should be stateful=False, autoscale=True)
calc_service = rr.service(Calculator)

# Use a stateless method because class services cannot be stateful.
Expand All @@ -367,20 +367,19 @@ def test_runner_defaults_class(check_package_config, check_flmrun_app):


def test_runner_defaults_instance(check_package_config, check_flmrun_app):
"""Test Case 20: Test default parameters for instance (stateful=False, autoscale=False)."""
"""Test Case 20: Test default parameters for instance (stateful=True, autoscale=False)."""
with runner.Runner("test-runner-defaults-instance") as rr:
# Create an instance
calc = Calculator()
counter = Counter()

# Create service with instance using defaults (should be stateful=False, autoscale=False)
calc_service = rr.service(calc)
# Create service with instance using defaults (should be stateful=True, autoscale=False)
counter_service = rr.service(counter)

# Call methods
result1 = calc_service.add(5, 3)
result2 = calc_service.subtract(10, 4)
counter_service.add(5).wait()
counter_service.increment().wait()
result = counter_service.get_count()

values = rr.get([result1, result2])
assert values == [8, 6], f"Expected [8, 6], got {values}"
value = result.get()
assert value == 6, f"Expected 6, got {value}"


def test_runner_auto_start(check_package_config, check_flmrun_app):
Expand Down Expand Up @@ -715,26 +714,28 @@ def test_runner_recursive_same_session(check_package_config, check_flmrun_app):
"""
import logging
import time
import uuid

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Shared application name and session ID
shared_app_name = "test-runner-recursive"
shared_session_id = "recursive-session-001"
recursive_suffix = uuid.uuid4().hex[:8]
shared_app_name = f"test-runner-recursive-{recursive_suffix}"
shared_session_id = f"recursive-session-{recursive_suffix}"

logger.info(f"[TEST] Starting recursive test: app={shared_app_name}, session={shared_session_id}")

# Create an instance with the shared session ID and app name
recursive_instance = RecursiveService(
session_id=shared_session_id,
app_name=shared_app_name,
)
class RecursiveTestService(RecursiveService):
_session_context = SessionContext(
session_id=shared_session_id,
application_name=shared_app_name,
)

with runner.Runner(shared_app_name) as rr:
# Use autoscale=True to allow multiple executors for recursive calls
# Without autoscale, a single executor would deadlock waiting for its own recursive task
service = rr.service(recursive_instance, autoscale=True)
service = rr.service(RecursiveTestService, autoscale=True)
logger.info(f"[TEST] Service created, session_id={service._session.id}")

# Verify the session ID matches
Expand Down Expand Up @@ -1123,7 +1124,7 @@ def test_task_chaining_sequential(self, check_package_config, check_flmrun_app):
"""Test sequential task chaining where output of one task feeds into next."""
with runner.Runner("test-drf-chain-seq") as rr:
counter = Counter()
cnt_service = rr.service(counter, stateful=True, autoscale=False)
cnt_service = rr.service(counter)

cnt_service.add(10).wait()
cnt_service.add(5).wait()
Expand All @@ -1137,7 +1138,7 @@ def test_task_chaining_with_objectfuture(self, check_package_config, check_flmru
"""Test chaining using ObjectFuture as argument to next task."""
with runner.Runner("test-drf-chain-objfuture") as rr:
counter = Counter()
cnt_service = rr.service(counter, stateful=True, autoscale=False)
cnt_service = rr.service(counter)

cnt_service.add(10).wait()
intermediate = cnt_service.get_count()
Expand Down Expand Up @@ -1279,7 +1280,7 @@ def test_stateful_counter_operations(self, check_package_config, check_flmrun_ap
"""Test stateful counter with multiple operations."""
with runner.Runner("test-drf-stateful-counter") as rr:
counter = Counter()
cnt_service = rr.service(counter, stateful=True, autoscale=False)
cnt_service = rr.service(counter)

cnt_service.add(100).wait()
cnt_service.increment().wait()
Expand All @@ -1296,8 +1297,8 @@ def test_stateful_isolation_between_services(self, check_package_config, check_f
counter1 = Counter()
counter2 = Counter()

svc1 = rr.service(counter1, stateful=True, autoscale=False)
svc2 = rr.service(counter2, stateful=True, autoscale=False)
svc1 = rr.service(counter1)
svc2 = rr.service(counter2)

svc1.add(10).wait()
svc1.increment().wait()
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ with Runner("add-app") as runner:
Key classes and helpers:

- `Runner(name, fail_if_exists=False)`
- `Runner.service(execution_object, stateful=None, autoscale=None, warmup=0, resreq=None)`
- `Runner.service(execution_object, autoscale=None, warmup=0, resreq=None)`
- `Runner.get(futures)`, `Runner.ref(futures)`, `Runner.wait(futures)`, `Runner.select(futures)`
- `ObjectFuture.get()`, `ObjectFuture.ref()`, `ObjectFuture.wait()`
- `get_data(data)` for decoding Runner task input/output payloads
Expand Down
Loading
Loading