In [3]:
!pip install python-statemachine

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
import signal
import time
from statemachine import State, StateMachine
from datetime import datetime  # For logging timestamps


class TrafficLightMachine(StateMachine):
    "A traffic light machine"

    green = State(initial=True)
    yellow = State()
    red = State()

    cycle = green.to(yellow) | yellow.to(red) | red.to(green)

    def before_cycle(self, event: str, source: State, target: State):
        print(f"Running {event} from {source.id} to {target.id}")


class Supervisor:
    def __init__(self, sm: StateMachine, sm_event: str):
        self.sm = sm
        self.sm_event = sm_event
        self.running = True

    def signal_handler(self, signum, frame):
        """Handle the periodic signal to trigger state transitions."""
        if self.running:
            # Log the event with a timestamp
            print(f"{datetime.now().isoformat()} - Signal received: {signum}. Triggering event: {self.sm_event}")
            self.sm.send(self.sm_event)

    def start(self, interval: float):
        """Start the supervisor with a periodic signal."""
        # Convert interval to seconds for signal ITIMER_REAL
        signal.signal(signal.SIGALRM, self.signal_handler)
        signal.setitimer(signal.ITIMER_REAL, interval, interval)

    def stop(self):
        """Stop the periodic signal."""
        self.running = False
        signal.setitimer(signal.ITIMER_REAL, 0)  # Disable the timer


# Initialize the state machine and supervisor
traffic_light = TrafficLightMachine()
supervisor = Supervisor(traffic_light, "cycle")

timeout_interval=0.1

try:
    # Start the supervisor with a 0.1-second interval
    supervisor.start(interval=timeout_interval)

    # Run indefinitely, until interrupted
    print("Running indefinitely. Press Ctrl+C to stop.")
    while True:
        pass

except KeyboardInterrupt:
    print("\nStopping supervisor...")

finally:
    # Stop the supervisor when interrupted
    supervisor.stop()
    print("Supervisor stopped.")


Running indefinitely. Press Ctrl+C to stop.
2024-12-11T13:53:22.880306 - Signal received: 14. Triggering event: cycle
Running cycle from green to yellow
2024-12-11T13:53:22.980428 - Signal received: 14. Triggering event: cycle
Running cycle from yellow to red
2024-12-11T13:53:23.080355 - Signal received: 14. Triggering event: cycle
Running cycle from red to green
2024-12-11T13:53:23.180483 - Signal received: 14. Triggering event: cycle
Running cycle from green to yellow
2024-12-11T13:53:23.280268 - Signal received: 14. Triggering event: cycle
Running cycle from yellow to red
2024-12-11T13:53:23.380489 - Signal received: 14. Triggering event: cycle
Running cycle from red to green
2024-12-11T13:53:23.480459 - Signal received: 14. Triggering event: cycle
Running cycle from green to yellow
2024-12-11T13:53:23.580429 - Signal received: 14. Triggering event: cycle
Running cycle from yellow to red
2024-12-11T13:53:23.680708 - Signal received: 14. Triggering event: cycle
Running cycle from red