Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
155 lines (130 sloc) 4.86 KB
"""
Finite state machine class.
The fsm class stores dictionary of state/input keys, values are
next state and action
when searching for matching state/input key, exact match
is checked first, then the input is matched against any regular
expressions associated with the state. As a last resort, state/None
can be used as a default for that state.
action is a function to execute which takes the current state
and input as arguments
Exported symbols:
class FSM
exception FSMError - raised when an execution error occurs
exception RestartError - raised by an action routine to bail back to
a restart state
int FSMEOF - used as a special input by users to signal
termination of the machine
"""
class FSMError(Exception):
pass
class RestartError(Exception):
pass
FSMEOF = -1
import re
_RGXT = type(re.compile('foo'))
del re
class FSM:
"""
Finite State Machine
simple example:
>>> def do_faq(state, input):
... print 'send faqfile'
>>> def do_help(state, input):
... print 'send helpfile'
>>> def cleanup(state, input):
... print 'clean up'
>>> import re
>>> fsm = FSM()
>>> fsm.add('start', re.compile('help', re.I), 'start', do_help)
>>> fsm.add('start', 'faq', 'start', do_faq)
>>> # matches anything, does nothing
>>> fsm.add('start', None, 'start')
>>> fsm.add('start', FSMEOF, 'done', cleanup)
>>> fsm.start('start')
>>> for line in ['faq', 'help', 'FRED', FSMEOF]:
... try:
... fsm.execute(line)
... except FSMError:
... sys.stderr.write('Invalid input: %r' % line)
...
send faqfile
send helpfile
clean up
"""
def __init__(self):
self.states = {}
self.state = None
self.dbg = None
# add another state to the fsm
def add(self, state, input, newstate, action=None):
"""add a new state to the state machine"""
try:
self.states[state][input] = (newstate, action)
except KeyError:
self.states[state] = {}
self.states[state][input] = (newstate, action)
# perform a state transition and action execution
def execute(self, input):
"""execute the action for the current (state,input) pair"""
if self.state not in self.states:
raise FSMError('Invalid state: %s' % self.state)
state = self.states[self.state]
# exact state match?
if input in state:
newstate, action = state[input]
if action is not None:
try:
action(self.state, input)
except RestartError, restartto:
# action routine can raise RestartError to force
# jump to a different state - usually back to start
# if input didn't look like it was supposed to
self.state = restartto
return
self.state = newstate
return
# no, how about a regex match? (first match wins)
else:
for s in state:
if isinstance(s, _RGXT) and s.match(input) is not None:
newstate, action = state[s]
if action is not None:
try:
action(self.state, input)
except RestartError, restartto:
# action routine can raise RestartError to force
# jump to a different state - usually back to start
# if input didn't look like it was supposed to
self.state = restartto
return
self.state = newstate
return
if None in state:
newstate, action = state[None]
if action is not None:
try:
action(self.state, input)
except RestartError, restartto:
# action routine can raise RestartError to force
# jump to a different state - usually back to start
# if input didn't look like it was supposed to
self.state = restartto
return
self.state = newstate
return
# oh well, bombed out...
else:
raise FSMError('Invalid input to finite state machine: %s' %
input)
# define the start state
def start(self, state):
"""set the start state"""
self.state = state
# assign a writable file to catch debugging transitions
def debug(self, out=None):
"""assign a debugging file handle"""
self.dbg = out
if __name__ == "__main__":
import doctest
doctest.testmod()
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.