diff --git a/stateforward/elements/__init__.py b/stateforward/elements/__init__.py index a4b8af2..4336fe4 100644 --- a/stateforward/elements/__init__.py +++ b/stateforward/elements/__init__.py @@ -53,7 +53,7 @@ A base class for complex states that may contain other states (regions). ### State -A simple or composite state within the state machine that can perform `entry`, `exit`, and `do_activity` behaviors. It may also contain `completion`, `deferred` events, and a reference to a `submachine`. +A simple or composite state within the state machine that can perform `entry`, `exit`, and `activity` behaviors. It may also contain `completion`, `deferred` events, and a reference to a `submachine`. ### Region Represents a 'container' for states inside composite states or state machines. Maintains a `subvertex` collection for the vertices it encloses and references its `initial` state. diff --git a/stateforward/elements/elements.py b/stateforward/elements/elements.py index 3b41a84..c3ca1ee 100644 --- a/stateforward/elements/elements.py +++ b/stateforward/elements/elements.py @@ -600,14 +600,14 @@ class State(Vertex, CompositeState): """ State is a Vertex that is also a CompositeState. - It can define behaviors for entry and exit, as well as an ongoing do_activity behavior. The State can also + It can define behaviors for entry and exit, as well as an ongoing activity behavior. The State can also have a completion event, which indicates that the internal behavior and activities are complete, and potentially deferred events. States can also act as submachine states containing an entire StateMachine. Attributes: entry (Behavior): The behavior that is executed when entering the state. exit (Behavior): The behavior that is executed when exiting the state. - do_activity (Behavior): The behavior that is executed while the state machine is in this state. + activity (Behavior): The behavior that is executed while the state machine is in this state. completion (CompletionEvent): An event that is triggered when the state has completed its activity. deferred (Collection[Event]): Events that are not handled in the state but are deferred to the state machine for handling at a later time. @@ -617,7 +617,7 @@ class State(Vertex, CompositeState): entry: "Behavior" = None exit: "Behavior" = None - do_activity: "Behavior" = None + activity: "Behavior" = None completion: CompletionEvent = None deferred: model.Collection[Event] = None submachine: "StateMachine" = None diff --git a/stateforward/elements/functional.py b/stateforward/elements/functional.py index f8ca8cc..176fa88 100644 --- a/stateforward/elements/functional.py +++ b/stateforward/elements/functional.py @@ -210,7 +210,7 @@ def simple_state( exit: Union[ type[elements.Behavior], Callable[[elements.Behavior, elements.Event], None] ] = None, - do_activity: Union[ + activity: Union[ type[elements.Behavior], Callable[[elements.Behavior, elements.Event], None] ] = None, ): @@ -218,14 +218,14 @@ def simple_state( entry = behavior(entry) if not model.element.is_subtype(exit, elements.Behavior): exit = behavior(exit) - if not model.element.is_subtype(do_activity, elements.Behavior): - do_activity = behavior(do_activity) + if not model.element.is_subtype(activity, elements.Behavior): + activity = behavior(activity) return model.element.new( name, (elements.State,), entry=entry, exit=exit, - do_activity=do_activity, + activity=activity, ) diff --git a/stateforward/state_machine/compiler/state_machine_compiler.py b/stateforward/state_machine/compiler/state_machine_compiler.py index 841c9ba..5a8692f 100644 --- a/stateforward/state_machine/compiler/state_machine_compiler.py +++ b/stateforward/state_machine/compiler/state_machine_compiler.py @@ -1,6 +1,7 @@ from stateforward.elements import StateMachine +# TODO implement a compiler for the state machine class StateMachineCompiler: @classmethod def compile(cls, state_machine: StateMachine): diff --git a/stateforward/state_machine/interpreters/asynchronous/async_interpreter.py b/stateforward/state_machine/interpreters/asynchronous/async_interpreter.py index 91a2838..626dac0 100644 --- a/stateforward/state_machine/interpreters/asynchronous/async_interpreter.py +++ b/stateforward/state_machine/interpreters/asynchronous/async_interpreter.py @@ -94,7 +94,9 @@ async def run(self) -> None: await self.terminate() async def step(self) -> None: - pass + for future in self.stack.values(): + if exception := future.exception() is not None: + raise exception def is_active(self, *elements: model.Element) -> bool: return all(element in self.stack for element in elements) @@ -110,9 +112,12 @@ def push( return typing.cast(Future, future) def pop(self, element: model.Element, *, result: typing.Any = NULL): - future = self.stack.pop(element, Null()) - if result is not NULL and not future.done(): - future.set_result(result) + future = self.stack.pop(element, NULL) + if future.done(): + if future.exception() is not None: + raise future.result() + elif result is not NULL: + future.set_result(result) return typing.cast(Future, future) def terminate(self): diff --git a/stateforward/state_machine/interpreters/asynchronous/async_state_machine_interpreter.py b/stateforward/state_machine/interpreters/asynchronous/async_state_machine_interpreter.py index d2f8dad..890eecb 100644 --- a/stateforward/state_machine/interpreters/asynchronous/async_state_machine_interpreter.py +++ b/stateforward/state_machine/interpreters/asynchronous/async_state_machine_interpreter.py @@ -154,7 +154,6 @@ async def exec_transition_exit(self, transition: elements.Transition): async def exec_vertex_entry( self, vertex: elements.Vertex, event: elements.Event, kind: elements.EntryKind ): - # self.log.debug(f"entering vertex {model.qualified_name_of(vertex)}") self.push(vertex) if isinstance(vertex, elements.State): await self.exec_state_entry(vertex, event, kind) @@ -176,7 +175,7 @@ async def exec_vertex_entry( async def exec_final_state_entry( self, final_state: elements.FinalState, event: elements.Event ): - await self.terminate() + raise NotImplementedError() async def exec_event_exit(self, event: elements.Event): self.pop(event) @@ -204,10 +203,10 @@ def exec_time_event_entry(self, event: elements.TimeEvent): async def exec_completion_event_wait(self, event: elements.CompletionEvent): source: elements.State = model.owner_of(event) - future = self.stack.get(source.do_activity) + future = self.stack.get(source.activity) event.value = await future activities = tuple( - self.stack.get(typing.cast(elements.State, state).do_activity) + self.stack.get(typing.cast(elements.State, state).activity) for state in self.stack if model.element.is_subtype(state, elements.State) and model.element.is_descendant_of(source, state) @@ -224,16 +223,6 @@ def exec_completion_event_entry(self, event: elements.CompletionEvent): ) return task - async def exec_call_event_wait(self, event: elements.CallEvent): - await event.results - self.push(event) - - def exec_call_event_entry(self, event: elements.CallEvent): - qualified_name = model.qualified_name_of(event) - return asyncio.create_task( - self.exec_call_event_wait(event), name=qualified_name - ) - def exec_event_entry(self, event: elements.Event): qualified_name = model.qualified_name_of(event) self.log.debug(f"entering event {qualified_name}") @@ -243,9 +232,6 @@ def exec_event_entry(self, event: elements.Event): elif isinstance(event, elements.CompletionEvent): return self.exec_completion_event_entry(event) - # elif isinstance(event, elements.CallEvent): - # return self.exec_call_event_entry(event) - elif isinstance(event, elements.ChangeEvent): return self.exec_change_event_entry(event) @@ -271,15 +257,13 @@ async def exec_state_entry( self.log.debug(f"entering state {qualified_name}") if state.entry is not None: await self.exec_behavior(state.entry, event) - if state.do_activity is not None: + if state.activity is not None: self.push( - state.do_activity, - self.loop.create_task(self.exec_behavior(state.do_activity, event)), + state.activity, + self.loop.create_task(self.exec_behavior(state.activity, event)), ) if state.submachine is not None: return - # await self.enter_state_machine(state.submachine, event, kind) - # else: await asyncio.gather( *( self.exec_region_entry(region, event, kind) @@ -344,10 +328,10 @@ async def exec_state_exit(self, state: elements.State, event: elements.Event): for region in state.regions or [] ) ) - if state.do_activity is not None: - do_activity = self.pop(state.do_activity) - if not do_activity.done(): - do_activity.cancel() + if state.activity is not None: + activity = self.pop(state.activity) + if not activity.done(): + activity.cancel() if state.exit is not None: await self.exec_behavior(state.exit, event) self.log.debug(f'leaving state "{qualified_name}"') diff --git a/stateforward/state_machine/interpreters/sequential/__init__.py b/stateforward/state_machine/interpreters/sequential/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/stateforward/state_machine/interpreters/sequential/sequential_behavior_interpreter.py b/stateforward/state_machine/interpreters/sequential/sequential_behavior_interpreter.py deleted file mode 100644 index e69de29..0000000 diff --git a/stateforward/state_machine/interpreters/sequential/sequential_interpreter.py b/stateforward/state_machine/interpreters/sequential/sequential_interpreter.py deleted file mode 100644 index 91a2838..0000000 --- a/stateforward/state_machine/interpreters/sequential/sequential_interpreter.py +++ /dev/null @@ -1,124 +0,0 @@ -import typing -from stateforward import model -from enum import Enum -import asyncio -from stateforward.protocols.future import Future -from stateforward.protocols.clock import Clock -from stateforward.protocols.queue import Queue -import logging - - -T = typing.TypeVar("T", bound=model.Model) - - -class Null(asyncio.Future): - def __init__(self): - super().__init__() - self.set_result(None) - - -NULL = Null() - - -class InterpreterStep(Enum): - complete = "complete" - incomplete = "incomplete" - deferred = "deferred" - - -class AsyncInterpreter(model.Element, typing.Generic[T]): - queue: Queue = None - clock: Clock - stack: dict[model.Element, asyncio.Future] = None - loop: asyncio.AbstractEventLoop = None - log: logging.Logger = logging.getLogger(__name__) - running: asyncio.Event = None - - def __init__(self, queue: Queue, log: logging.Logger = None): - self.stack = {} - self.queue = queue - self.running = asyncio.Event() - self.log = log or self.log - - def send(self, event: model.Element): - self.log.debug(f"Received {model.qualified_name_of(event)}") - # push the event onto the stack - future = self.push(event, asyncio.Future()) - # add the event to the queue - self.queue.put_nowait(event) - return self.wait( - future, - self.stack.get(self), - name=f"{model.qualified_name_of(event)}.sent", - ) - - def start( - self, - loop: asyncio.AbstractEventLoop = None, - ): - qualified_name = model.qualified_name_of(self) - self.log.debug(f"Starting {qualified_name}") - loop = self.loop = loop or asyncio.get_event_loop() - task = loop.create_task(self.run(), name=qualified_name) - started_task = self.loop.create_task(self.running.wait()) - self.push(self, task) - return self.wait(task, started_task) - - def wait( - self, - *tasks: typing.Union[asyncio.Task, asyncio.Future], - name: str = None, - return_when: str = asyncio.FIRST_COMPLETED, - ) -> asyncio.Task: - async def wait_for_tasks(_tasks=tasks, _return_when=return_when): - done, pending = await asyncio.wait(_tasks, return_when=_return_when) - return await done.pop() - - return self.loop.create_task( - wait_for_tasks(), - name=name or "_and_".join(task.get_name() for task in tasks), - ) - - async def run(self) -> None: - self.log.debug( - f"Running {model.qualified_name_of(self)} clock multiplier {self.clock.multiplier}" - ) - self.running.set() - try: - while self.running.is_set(): - await self.step() - await asyncio.sleep(self.clock.multiplier) - except asyncio.CancelledError: - self.log.debug(f"Cancelled {model.qualified_name_of(self)}") - if self.running.is_set(): - await self.terminate() - - async def step(self) -> None: - pass - - def is_active(self, *elements: model.Element) -> bool: - return all(element in self.stack for element in elements) - - def push( - self, element: model.Element, future: typing.Union[Future, asyncio.Task] = NULL - ): - if element in self.stack: - raise ValueError( - f"element {model.qualified_name_of(element)} already exists" - ) - future = self.stack.setdefault(element, future) - return typing.cast(Future, future) - - def pop(self, element: model.Element, *, result: typing.Any = NULL): - future = self.stack.pop(element, Null()) - if result is not NULL and not future.done(): - future.set_result(result) - return typing.cast(Future, future) - - def terminate(self): - self.running.clear() - task = self.pop(self) - task.cancel() - return task - - model: T = None diff --git a/stateforward/state_machine/interpreters/sequential/sequential_state_machine_interpreter.py b/stateforward/state_machine/interpreters/sequential/sequential_state_machine_interpreter.py deleted file mode 100644 index c573a2f..0000000 --- a/stateforward/state_machine/interpreters/sequential/sequential_state_machine_interpreter.py +++ /dev/null @@ -1,117 +0,0 @@ -import typing -from stateforward import model -import asyncio -from stateforward.protocols.future import Future -from stateforward.protocols.clock import Clock -from stateforward.protocols.queue import Queue -import logging - - -T = typing.TypeVar("T", bound=model.Model) - - -class Null(asyncio.Future): - def __init__(self): - super().__init__() - self.set_result(None) - - -NULL = Null() - - -class Interpreter(model.Element, typing.Generic[T]): - queue: Queue = None - clock: Clock - stack: dict[model.Element, asyncio.Future] = None - loop: asyncio.AbstractEventLoop = None - log: logging.Logger = logging.getLogger(__name__) - running: asyncio.Event = None - - def __init__(self, queue: Queue, log: logging.Logger = None): - self.stack = {} - self.queue = queue - self.running = asyncio.Event() - self.log = log or self.log - - def send(self, event: model.Element): - self.log.debug(f"Received {model.qualified_name_of(event)}") - # push the event onto the stack - future = self.push(event, asyncio.Future()) - # add the event to the queue - self.queue.put_nowait(event) - return self.wait( - future, - self.stack.get(self), - name=f"{model.qualified_name_of(event)}.sent", - ) - - def start( - self, - loop: asyncio.AbstractEventLoop = None, - ): - qualified_name = model.qualified_name_of(self) - self.log.debug(f"Starting {qualified_name}") - loop = self.loop = loop or asyncio.get_event_loop() - task = loop.create_task(self.run(), name=qualified_name) - started_task = self.loop.create_task(self.running.wait()) - self.push(self, task) - return self.wait(task, started_task) - - def wait( - self, - *tasks: typing.Union[asyncio.Task, asyncio.Future], - name: str = None, - return_when: str = asyncio.FIRST_COMPLETED, - ) -> asyncio.Task: - async def wait_for_tasks(_tasks=tasks, _return_when=return_when): - done, pending = await asyncio.wait(_tasks, return_when=_return_when) - return await done.pop() - - return self.loop.create_task( - wait_for_tasks(), - name=name or "_and_".join(task.get_name() for task in tasks), - ) - - async def run(self) -> None: - self.log.debug( - f"Running {model.qualified_name_of(self)} clock multiplier {self.clock.multiplier}" - ) - self.running.set() - try: - while self.running.is_set(): - await self.step() - await asyncio.sleep(self.clock.multiplier) - except asyncio.CancelledError: - self.log.debug(f"Cancelled {model.qualified_name_of(self)}") - if self.running.is_set(): - await self.terminate() - - async def step(self) -> None: - pass - - def is_active(self, *elements: model.Element) -> bool: - return all(element in self.stack for element in elements) - - def push( - self, element: model.Element, future: typing.Union[Future, asyncio.Task] = NULL - ): - if element in self.stack: - raise ValueError( - f"element {model.qualified_name_of(element)} already exists" - ) - future = self.stack.setdefault(element, future) - return typing.cast(Future, future) - - def pop(self, element: model.Element, *, result: typing.Any = NULL): - future = self.stack.pop(element, Null()) - if result is not NULL and not future.done(): - future.set_result(result) - return typing.cast(Future, future) - - def terminate(self): - self.running.clear() - task = self.pop(self) - task.cancel() - return task - - model: T = None diff --git a/stateforward/state_machine/interpreters/threaded/__init__.py b/stateforward/state_machine/interpreters/threaded/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/stateforward/state_machine/interpreters/threaded/threaded_behavior_interpreter.py b/stateforward/state_machine/interpreters/threaded/threaded_behavior_interpreter.py deleted file mode 100644 index e69de29..0000000 diff --git a/stateforward/state_machine/interpreters/threaded/threaded_interpreter.py b/stateforward/state_machine/interpreters/threaded/threaded_interpreter.py deleted file mode 100644 index 9d5d36f..0000000 --- a/stateforward/state_machine/interpreters/threaded/threaded_interpreter.py +++ /dev/null @@ -1,124 +0,0 @@ -import typing -from stateforward import model -from enum import Enum -import asyncio -from stateforward.protocols.future import Future -from stateforward.protocols.clock import Clock -from stateforward.protocols.queue import Queue -import logging - - -T = typing.TypeVar("T", bound=model.Model) - - -class Null(asyncio.Future): - def __init__(self): - super().__init__() - self.set_result(None) - - -NULL = Null() - - -class InterpreterStep(Enum): - complete = "complete" - incomplete = "incomplete" - deferred = "deferred" - - -class Interpreter(model.Element, typing.Generic[T]): - queue: Queue = None - clock: Clock - stack: dict[model.Element, asyncio.Future] = None - loop: asyncio.AbstractEventLoop = None - log: logging.Logger = logging.getLogger(__name__) - running: asyncio.Event = None - - def __init__(self, queue: Queue, log: logging.Logger = None): - self.stack = {} - self.queue = queue - self.running = asyncio.Event() - self.log = log or self.log - - def send(self, event: model.Element): - self.log.debug(f"Received {model.qualified_name_of(event)}") - # push the event onto the stack - future = self.push(event, asyncio.Future()) - # add the event to the queue - self.queue.put_nowait(event) - return self.wait( - future, - self.stack.get(self), - name=f"{model.qualified_name_of(event)}.sent", - ) - - def start( - self, - loop: asyncio.AbstractEventLoop = None, - ): - qualified_name = model.qualified_name_of(self) - self.log.debug(f"Starting {qualified_name}") - loop = self.loop = loop or asyncio.get_event_loop() - task = loop.create_task(self.run(), name=qualified_name) - started_task = self.loop.create_task(self.running.wait()) - self.push(self, task) - return self.wait(task, started_task) - - def wait( - self, - *tasks: typing.Union[asyncio.Task, asyncio.Future], - name: str = None, - return_when: str = asyncio.FIRST_COMPLETED, - ) -> asyncio.Task: - async def wait_for_tasks(_tasks=tasks, _return_when=return_when): - done, pending = await asyncio.wait(_tasks, return_when=_return_when) - return await done.pop() - - return self.loop.create_task( - wait_for_tasks(), - name=name or "_and_".join(task.get_name() for task in tasks), - ) - - async def run(self) -> None: - self.log.debug( - f"Running {model.qualified_name_of(self)} clock multiplier {self.clock.multiplier}" - ) - self.running.set() - try: - while self.running.is_set(): - await self.step() - await asyncio.sleep(self.clock.multiplier) - except asyncio.CancelledError: - self.log.debug(f"Cancelled {model.qualified_name_of(self)}") - if self.running.is_set(): - await self.terminate() - - async def step(self) -> None: - pass - - def is_active(self, *elements: model.Element) -> bool: - return all(element in self.stack for element in elements) - - def push( - self, element: model.Element, future: typing.Union[Future, asyncio.Task] = NULL - ): - if element in self.stack: - raise ValueError( - f"element {model.qualified_name_of(element)} already exists" - ) - future = self.stack.setdefault(element, future) - return typing.cast(Future, future) - - def pop(self, element: model.Element, *, result: typing.Any = NULL): - future = self.stack.pop(element, Null()) - if result is not NULL and not future.done(): - future.set_result(result) - return typing.cast(Future, future) - - def terminate(self): - self.running.clear() - task = self.pop(self) - task.cancel() - return task - - model: T = None diff --git a/stateforward/state_machine/interpreters/threaded/threaded_state_machine_interpreter.py b/stateforward/state_machine/interpreters/threaded/threaded_state_machine_interpreter.py deleted file mode 100644 index e69de29..0000000 diff --git a/stateforward/state_machine/preprocessor.py b/stateforward/state_machine/preprocessor.py index c6a6a1b..ee62644 100644 --- a/stateforward/state_machine/preprocessor.py +++ b/stateforward/state_machine/preprocessor.py @@ -85,7 +85,7 @@ def preprocess_state(self, element: type["elements.State"]): self.preprocess_vertex(element) - for behavior in ("entry", "exit", "do_activity"): + for behavior in ("entry", "exit", "activity"): if getattr(element, behavior) is None: model.set_attribute( element, diff --git a/tests/async_statemachine_test.py b/tests/async_statemachine_test.py index 06155b9..34b9d7b 100644 --- a/tests/async_statemachine_test.py +++ b/tests/async_statemachine_test.py @@ -6,7 +6,7 @@ class TestSM(sf.AsyncStateMachine): class s1(sf.State): entry = sf.behavior(AsyncMock()) - do_activity = sf.behavior(AsyncMock()) + activity = sf.behavior(AsyncMock()) exit = sf.behavior(AsyncMock()) initial = sf.initial(s1) @@ -33,10 +33,10 @@ class s1(sf.State): @pytest.mark.asyncio -async def test_state_do_activity_with_exception(): +async def test_state_activity_with_exception(): class SM(sf.AsyncStateMachine): class s1(sf.State): - do_activity = sf.behavior(AsyncMock(side_effect=Exception)) + activity = sf.behavior(AsyncMock(side_effect=Exception)) initial = sf.initial(s1) diff --git a/tests/completion_event_test.py b/tests/completion_event_test.py index 7ae73a2..037cc4a 100644 --- a/tests/completion_event_test.py +++ b/tests/completion_event_test.py @@ -7,19 +7,19 @@ class SM(sf.AsyncStateMachine): class s1(sf.State): @sf.decorators.behavior - async def do_activity(self, event=None): + async def activity(self, event=None): await asyncio.sleep(1) return "s1" class s2(sf.State): @sf.decorators.behavior - async def do_activity(self, event=None): + async def activity(self, event=None): await asyncio.sleep(1) return "s2" class s3(sf.State): @sf.decorators.behavior - async def do_activity(self, event=None): + async def activity(self, event=None): await asyncio.sleep(1) initial = sf.initial(s1) diff --git a/tests/mock.py b/tests/mock.py index f2c84a6..1648a4f 100644 --- a/tests/mock.py +++ b/tests/mock.py @@ -125,11 +125,11 @@ def was_terminated(self) -> bool: class MockedState(Mocked, element=sf.State): def was_entered(self) -> bool: - return self.entry.is_done() and self.do_activity.is_started() + return self.entry.is_done() and self.activity.is_started() def is_entered(self) -> bool: return ( - self.is_active() and self.entry.is_done() and self.do_activity.is_started() + self.is_active() and self.entry.is_done() and self.activity.is_started() ) def is_entered(self) -> bool: @@ -138,13 +138,13 @@ def is_entered(self) -> bool: def was_not_entered(self) -> bool: if not self.entry.is_inactive(): return False - elif not self.do_activity.is_inactive(): + elif not self.activity.is_inactive(): return False return self.is_inactive() def was_not_exited(self) -> bool: return ( - self.do_activity.is_started() + self.activity.is_started() and self.exit.is_inactive() and self.is_active() ) and self diff --git a/tests/transition_test.py b/tests/transition_test.py index ff85467..8ae1d1c 100644 --- a/tests/transition_test.py +++ b/tests/transition_test.py @@ -14,10 +14,6 @@ class e2(sf.Event): class SM(sf.AsyncStateMachine): class s1(sf.State): - # @sf.decorators.behavior - # async def do_activity(self, event=None): - # return "s1" - class s1_1(sf.State): pass @@ -29,13 +25,13 @@ class s1_3(sf.State): class s2(sf.State): @sf.decorators.behavior - async def do_activity(self, event=None): + async def activity(self, event=None): await asyncio.sleep(1) return "s2" class s3(sf.State): @sf.decorators.behavior - async def do_activity(self, event=None): + async def activity(self, event=None): await asyncio.sleep(1) initial = sf.initial(s1.s1_1) @@ -50,7 +46,7 @@ async def do_activity(self, event=None): async def test_local_transition(): sm = mock(SM()) await sm.interpreter.start() - expect.only(sm.s1, sm.s1.s1_1).was_entered() + expect.only(sm.s1, sm.s1.s1_1, sm.s1.s1_2).was_entered() # sm.reset_mocked() # await sm.dispatch(e1()) # expect.only(sm.s1.s1_1).was_entered()