# Python State Machine
## Eine selbstgeschriebene SM

In [13]:
class StateMachine:
    def __init__(self):
        self.state = None  # Der aktuelle Zustand des Automaten
        self.transitions = {}  # Eine Tabelle der Übergänge zwischen Zuständen

    def add_state(self, state_name, is_initial=False):
        """Fügt einen neuen Zustand hinzu. Falls is_initial True ist, wird dieser Zustand als Anfangszustand festgelegt."""
        self.transitions[state_name] = {}
        if is_initial:
            self.state = state_name

    def add_transition(self, from_state, to_state, event):
        """Definiert einen Übergang von einem Zustand zu einem anderen bei einem bestimmten Ereignis."""
        if from_state in self.transitions:
            self.transitions[from_state][event] = to_state
        else:
            raise ValueError(f"Zustand '{from_state}' nicht definiert")

    def on_event(self, event):
        """Reagiert auf ein Ereignis und verändert den Zustand des Automaten entsprechend."""
        if self.state in self.transitions and event in self.transitions[self.state]:
            self.state = self.transitions[self.state][event]
        else:
            raise ValueError(f"Kein Übergang definiert für Zustand '{self.state}' bei Ereignis '{event}'")

    def get_state(self):
        """Gibt den aktuellen Zustand des Automaten zurück."""
        return self.state

# Beispiel für die Nutzung des Zustandsautomaten
fsm = StateMachine()
fsm.add_state("wait", is_initial=True)
fsm.add_state("wait_for_value")
fsm.add_state("wait_for_triggerId")
fsm.add_state("write_to_db")

fsm.add_transition("wait", "wait_for_value", "new_triggerId_event")
fsm.add_transition("wait", "wait_for_triggerId", "new_value_event")
fsm.add_transition("wait_for_triggerId", "write_to_db" , "new_triggerId_event")
fsm.add_transition("wait_for_value", "write_to_db" , "new_value_event")
fsm.add_transition("write_to_db" ,"wait", "db_ok")



print(fsm.get_state())  # Ausgabe: initial
fsm.on_event("new_triggerId_event")
print(fsm.get_state())  # Ausgabe: state1
fsm.on_event("new_value_event")
print(fsm.get_state())  # Ausgabe: state2
fsm.on_event("db_ok")
print(fsm.get_state())  # Ausgabe: state1

wait
wait_for_value
write_to_db
wait


# Eine StateMachine (SM) mit Transitions Library
https://github.com/pytransitions/transitions?tab=readme-ov-file#table-of-contents

In [1]:
from transitions import Machine

class Watch_PV_SM:
    '''
    Watch_PV_SM Klasse
    ==================

    Das ist der Observer für Laser ProzessVariablen und TriggerID.
    Die TriggerID wird von Raspberry PI erzeugt, nachdem es Trigger Signale am GPIO registriert. Dadurch entsteht 
    dem Triggersignal und Publizieren der TriggerId ein Delay.
    Weil es möglich ist, dass schnelle Geräte ihre Werte kurz vor der TriggerID publizieren und langsame Geräte 
    dagegen danach, wird Watch_PV_SM als StateMachine realisiert:

    States
    -------
    - Grundzustand: watch_PV
    - waiting_for_PV_update
    - waiting_for_trigger
    - write_to_db
    - timeout
    - error

    Szenarien
    ----------

    1. Szenario 1
        - Wenn neue TriggerID registriert wird, erfolgt ein Wechsel in Zustand: waiting_for_PV_update
        - waiting_for_PV_update wartet auf Änderung der PV. Passiert diese Änderung werden die Daten in DB geschrieben
        - write_to_db und Übergang in Grundzustand

    2. Szenario 2    
        - Wenn zuerst ein PV Update erfolgt, wechselt der Automat in Zustand: waiting_for_trigger
        - u.s.w  
    '''


    states=['watch_PV', 'waiting_for_trigger','waiting_for_PV_update', 'write_to_db', 'timeout', 'error']

    def __init__(self) -> None:
        self.SM = Machine(model=Watch_PV_SM, states=Watch_PV_SM.states)

SyntaxError: invalid syntax (739638691.py, line 1)

In [3]:
from transitions.extensions.asyncio import AsyncMachine
import asyncio
import nest_asyncio
nest_asyncio.apply()

In [49]:
import logging


class SimpleMachine:

    states = ['dummy', 'start', 'waiting']

    def __init__(self, stop_event):
        self.machine = AsyncMachine(model=self, states=SimpleMachine.states, initial='dummy')
        self.machine.add_transition('run', 'dummy', 'start')
        self.machine.add_transition('timeoutTransition', '*', 'waiting')
        self.machine.add_transition('stop_waiting', 'waiting', 'start')
        self.machine.add_transition('chill', 'start', 'dummy')
        self.stop_event = stop_event
        
        # print("work work")
        # await asyncio.sleep(10)

    async def on_enter_start(self):
        try:
            print("Ich tu nun was!")
            await asyncio.wait_for(self.stop_event.wait(), 10)
            print("Chill Kommando empfangen")
            await self.chill()
        except asyncio.TimeoutError:
            print("Timeout!")
            await self.timeoutTransition()

    async def on_enter_waiting(self):
        print("Enter wait state")

    async def on_enter_dummy():
        print('Dummy chill state')
        
test_machine = None
test_SM = None
stopEvent = asyncio.Event()

async def main():
    global test_machine
    test_machine = SimpleMachine(stopEvent)
    print("State Machine started")
    asyncio.create_task(test_machine.run())

asyncio.run(main())

State Machine started


Ich tu nun was!
Chill Kommando empfangen


In [50]:
stopEvent.set()
test_machine.state

'start'

In [29]:
test_machine.state

'start'