<a href="https://colab.research.google.com/github/renaud-florquin/R_training/blob/master/fsm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Table driven implementation of FSM

In [0]:
from enum import Enum

In [0]:
class PV_STATES(Enum):
    PV_OFF = 0
    PV_ON = 1
    PV_CLOSING = 2

In [0]:
class PV_EVENTS(Enum):
    PV_CLOSE = 0
    PV_OPEN = 1

In [4]:
PV_STATES.PV_ON

<PV_STATES.PV_ON: 1>

In [0]:
# transition action definitions
def on_transition(fsm, event):
    print('Open PV')
    return PV_STATES.PV_ON

def off_transition(fsm, event):
    print('Closing PV')
    return PV_STATES.PV_CLOSING

In [0]:
# transitions definitions
on_transition = {
    'event_id': PV_EVENTS.PV_OPEN,
    'action': on_transition
}
off_transition = {
    'event_id': PV_EVENTS.PV_CLOSE,
    'action': off_transition
}

In [0]:
# activity definitions
def do_closing(fsm):
    fsm['data']['closing_delay'] += 1 
    if fsm['data']['ovp']:
        print('OVP detected')
        return PV_STATES.PV_OFF
    else:
        if fsm['data']['closing_delay'] >= 3:
            return PV_STATES.PV_OFF
        else:
            print('wait ...')
            return PV_STATES.PV_CLOSING

def do_open(fsm):
    if fsm['data']['ovp']:
        print('OVP detected')
        return PV_STATES.PV_OFF
    else:
        print('Checks OK')
        return PV_STATES.PV_ON

# entry actions
def on_entry_close(fsm):
    print('PV Close')
    print('Stop PWM')

def on_entry_open(fsm):
    print('PV Open')
    print('Start PWM')
    print('Start Regulation')

def on_entry_closing(fsm):
    print('PV Closing')
    print('Stop regulation')
    print('Phase shift -90°')
    fsm['data']['closing_delay'] = 0    

In [0]:
off_state = {
    'state_id': PV_STATES.PV_OFF,
    'transitions': [on_transition], 
    'do': None, 
    'on_entry': on_entry_close, 
    'on_exit': None 
}

on_state = {
    'state_id': PV_STATES.PV_ON,
    'transitions': [off_transition], 
    'do': do_open, 
    'on_entry': on_entry_open, 
    'on_exit': None 
}

closing_state = {
    'state_id': PV_STATES.PV_CLOSING,
    'transitions': [], 
    'do': do_closing, 
    'on_entry': on_entry_closing, 
    'on_exit': None 
}

In [0]:
pv_fsm = {
    'init_state': off_state,
    'current_state': None,
    'states': {
    },
    'data': {
        'closing_delay': 0,
        'ovp': False
    }
}

In [0]:
def fsm_init(fsm, states):
    for state in states:
        fsm['states'][state['state_id']] = state 
    fsm['current_state'] = fsm['init_state']

In [0]:
def fsm_find_transition(fsm, event_id):
    for transition in fsm['current_state']['transitions']:
        if transition['event_id'] == event_id:
            return transition
    return None

In [0]:
def fsm_find_state(fsm, state_id):
    return fsm['states'].get(state_id, None)

In [0]:
def fsm_accept_event(fsm, event):
    transition = fsm_find_transition(fsm, event)
    if transition:
        print('Transition found for state: {} and event: {}'.format(fsm['current_state']['state_id'], event))
        if fsm['current_state']['on_exit']:
            fsm['current_state']['on_exit'](fsm)
        next_state_id = transition['action'](fsm, event)
        fsm['current_state'] = fsm_find_state(fsm, next_state_id)
        if fsm['current_state']['on_entry']:
            fsm['current_state']['on_entry'](fsm)
    else:
        print('! No transition found for state: {} and event: {}'.format(fsm['current_state']['state_id'], event))

def fsm_do(fsm):
    print('Do activity for state: {}'.format(fsm['current_state']['state_id']))
    if fsm['current_state']['do']:
        next_state_id = fsm['current_state']['do'](fsm)
        if next_state_id != fsm['current_state']['state_id']: 
            if fsm['current_state']['on_exit']:
                fsm['current_state']['on_exit'](fsm)
            fsm['current_state'] = fsm_find_state(fsm, next_state_id)
            if fsm['current_state']['on_entry']:
                fsm['current_state']['on_entry'](fsm)


In [0]:
def init():
    fsm_init(pv_fsm, [off_state, on_state, closing_state])  
    pv_fsm['data'] = {
        'closing_delay': 0,
        'ovp': False
    }

In [0]:
program1 = """
init()
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_accept_event(pv_fsm, PV_EVENTS.PV_OPEN)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_accept_event(pv_fsm, PV_EVENTS.PV_CLOSE)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
fsm_do(pv_fsm)
"""

In [16]:
exec(program1)

Do activity for state: PV_STATES.PV_OFF
Do activity for state: PV_STATES.PV_OFF
Do activity for state: PV_STATES.PV_OFF
Transition found for state: PV_STATES.PV_OFF and event: PV_EVENTS.PV_OPEN
Open PV
PV Open
Start PWM
Start Regulation
Do activity for state: PV_STATES.PV_ON
Checks OK
Do activity for state: PV_STATES.PV_ON
Checks OK
Do activity for state: PV_STATES.PV_ON
Checks OK
Transition found for state: PV_STATES.PV_ON and event: PV_EVENTS.PV_CLOSE
Closing PV
PV Closing
Stop regulation
Phase shift -90°
Do activity for state: PV_STATES.PV_CLOSING
wait ...
Do activity for state: PV_STATES.PV_CLOSING
wait ...
Do activity for state: PV_STATES.PV_CLOSING
PV Close
Stop PWM
Do activity for state: PV_STATES.PV_OFF
Do activity for state: PV_STATES.PV_OFF


In [0]:
program2 = """
init()
fsm_accept_event(pv_fsm, PV_EVENTS.PV_CLOSE)
fsm_accept_event(pv_fsm, PV_EVENTS.PV_OPEN)
fsm_do(pv_fsm)
fsm_accept_event(pv_fsm, PV_EVENTS.PV_OPEN)
fsm_do(pv_fsm)
pv_fsm['data']['ovp'] = True
fsm_do(pv_fsm)
fsm_do(pv_fsm)
"""

In [18]:
exec(program2)

! No transition found for state: PV_STATES.PV_OFF and event: PV_EVENTS.PV_CLOSE
Transition found for state: PV_STATES.PV_OFF and event: PV_EVENTS.PV_OPEN
Open PV
PV Open
Start PWM
Start Regulation
Do activity for state: PV_STATES.PV_ON
Checks OK
! No transition found for state: PV_STATES.PV_ON and event: PV_EVENTS.PV_OPEN
Do activity for state: PV_STATES.PV_ON
Checks OK
Do activity for state: PV_STATES.PV_ON
OVP detected
PV Close
Stop PWM
Do activity for state: PV_STATES.PV_OFF


In [0]:
program3 = """
init()
fsm_accept_event(pv_fsm, PV_EVENTS.PV_OPEN)
fsm_do(pv_fsm)
fsm_accept_event(pv_fsm, PV_EVENTS.PV_CLOSE)
fsm_do(pv_fsm)
pv_fsm['data']['ovp'] = True
fsm_do(pv_fsm)
fsm_do(pv_fsm)
"""

In [20]:
exec(program3)

Transition found for state: PV_STATES.PV_OFF and event: PV_EVENTS.PV_OPEN
Open PV
PV Open
Start PWM
Start Regulation
Do activity for state: PV_STATES.PV_ON
Checks OK
Transition found for state: PV_STATES.PV_ON and event: PV_EVENTS.PV_CLOSE
Closing PV
PV Closing
Stop regulation
Phase shift -90°
Do activity for state: PV_STATES.PV_CLOSING
wait ...
Do activity for state: PV_STATES.PV_CLOSING
OVP detected
PV Close
Stop PWM
Do activity for state: PV_STATES.PV_OFF
