# Finite State Machines

## Introduction

A **Finite State Machine** (FSM) is a model of computation to represent systems which have multiple states of operation, where each state maps to specific behaviors.

A FSM begins at an **initial state** and, given an input value (or event, or trigger), it might change to another state. Changing states is called a **transition** which is thus defined by the triple (_source state_, _event_, _destiny state_). Sometimes a transition also includes an output value.

A FSM is defined by an initial state and the set of its transitions (which implicitly define the set of available states).

Due to its memory is limited by its set of states, FSMs are less powerful than Turing Machines. FSMs recognize [regular languages](https://en.wikipedia.org/wiki/Regular_language) a strict subset of all [recursively enumerable languages](https://en.wikipedia.org/wiki/Recursively_enumerable_language), which are recognized by Turing Machines.

But in practice, this model is useful to represent several types of useful problems. Some examples:

+ an entrance turnstile (with two states, _locked_ and _unlocked_)

+ traffic lights (which have three states, and three transitions)

+ safes with combination locks (which can be _open_ or _closed_)

+ vending machines

+ elevators

## FSMs in Python

Let's consider a safe with a combination lock.

Assume that each safe initially receives a list of codes that must be entered in the right order, so that the safe opens. 

In the next implementation, a safe keeps a message string that can only be changed when the safe is open. When the safe object is created, the safe is closed.

Here is a typical implementation of this class:

In [None]:
class Safe_v0:
  def __init__(self, codes):
    self._message = ''
    self._is_open = False
    self._codes   = codes[:]
    self._current = 0  # the index of the next expected code

  def is_open(self):
    return self._is_open

  def msg(self):
    return self._message if self.is_open() else '<safe is closed>'

  def set_msg(self, msg):
    if self.is_open(): 
      self._message = msg
      self._is_open = False
      self._current = 0

  def ins_code(self, code):
    if not self.is_open():
      if self._codes[self._current] == code:
        self._current += 1
        if self._current == len(self._codes):
          self._is_open = True
      else:
        self._current = 0

Let's try one example:

In [None]:
safe = Safe_v0([1,2,3])
assert not safe.is_open()

safe.ins_code(1)
safe.ins_code(2)
safe.ins_code(3)
assert safe.is_open()

safe.set_msg('hello')
assert not safe.is_open()
print('msg:', safe.msg())

safe.ins_code(1)
safe.ins_code(2)
safe.ins_code(3)
assert safe.is_open()
print('msg:', safe.msg())

msg: <safe is closed>
msg: hello


The potential problem of this approach is that is scales poorly when the problem has many states. This typically implies methods with lots of conditional commands, which makes it hard to maintain.

### A more principled approach

Another way to implement FSMs is to define, for each state, what we will call a _state class_. A state class is composed entirely of static methods, and the names of these methods will be equal to the methods of the main class. 

The main class will have an attribute `state` that keeps the respetive class, given its current state. Also, for each public method `m(self, ...)`, it calls `self.state.m(self, ...)`, delegating the work to the respetive state class

Let's reimplement the problem with this approach.

First the main class:

In [None]:
class Safe:
  def __init__(self, codes):
    self.codes   = codes[:]
    self.current = 0
    self.message = ''
    self.state   = CloseSafe # initial state

  def is_open(self):
    return self.state.is_open()

  def msg(self):
    return self.state.msg(self)

  def set_msg(self, msg):
    self.state.set_msg(self, msg)

  def ins_code(self, code):
    self.state.ins_code(self, code)

Since the safe has two states (_open_ and _closed_) we will need to create two state classes:

In [None]:
class OpenSafe:
  @staticmethod
  def is_open():
    return True

  @staticmethod
  def msg(safe):
    return safe.message

  @staticmethod
  def set_msg(safe, msg):
    safe.message = msg
    safe.state = CloseSafe

  @staticmethod
  def ins_code(safe, code):
    pass

Notice that methods that are supposed to not be used at a given state (like insert a new code when the code is already open) will not produce exceptions, and will do nothing instead. This is a design decision to prevent errors, but it's only one option to deal with this problem.

In [None]:
class CloseSafe:
  @staticmethod
  def is_open():
    return False

  @staticmethod
  def msg(safe):
    return '<safe is closed>'

  @staticmethod
  def set_msg(safe, msg):
    pass

  @staticmethod
  def ins_code(safe, code):
    if code == safe.codes[safe.current]:
      safe.current += 1
    if safe.current == len(safe.codes):
      safe.state = OpenSafe
      safe.current = 0

Let's try the same use case:

In [None]:
safe = Safe([1,2,3])
assert not safe.is_open()

safe.ins_code(1)
safe.ins_code(2)
safe.ins_code(3)
assert safe.is_open()

safe.set_msg('hello')
assert not safe.is_open()
print('msg:', safe.msg())

safe.ins_code(1)
safe.ins_code(2)
safe.ins_code(3)
assert safe.is_open()
print('msg:', safe.msg())

msg: <safe is closed>
msg: hello


### Library `pytransitions`

In [None]:
!pip install transitions -q

[pytransitions](https://github.com/pytransitions/transitions) includes tools to design FSMs into Python classes.

Here's an example to adapt the Safe class using this library:

In [None]:
from transitions import Machine

class Safer:
  states = ['open', 'closed']

  def __init__(self, codes):
    self.codes = codes[:]
    self.current = 0
    self.message = ''

    self.fsm = Machine(model=self, states=Safer.states, initial='closed')

    self.fsm.add_transition(trigger='close', source='open',   dest='closed')
    self.fsm.add_transition(trigger='code',  source='closed', dest='open',   conditions=['all_codes'], after='reset_current')
    self.fsm.add_transition(trigger='code',  source='closed', dest='closed', conditions=['not_all_codes'])
  
  def all_codes(self):
    return self.current == len(self.codes)

  def not_all_codes(self):
    return not self.all_codes()

  def reset_current(self):
    self.current = 0

  def safe_is_open(self):
    return self.state == 'open'

  def msg(self):
    return self.message if self.safe_is_open() else '<safe is closed>'

  def set_msg(self, msg):
    if self.safe_is_open():
      self.message = msg
      self.close()

  def ins_code(self, code):
    if code == safe.codes[safe.current]:
      safe.current += 1
    self.code()

In [None]:
safe = Safer([1,2,3])
assert not safe.is_open()

safe.ins_code(1)
safe.ins_code(2)
safe.ins_code(3)
assert safe.is_open()

safe.set_msg('hello')
assert not safe.safe_is_open()
print('msg:', safe.msg())

safe.ins_code(1)
safe.ins_code(2)
safe.ins_code(3)
assert safe.safe_is_open()
print('msg:', safe.msg())

msg: <safe is closed>
msg: hello


However, it seems that if the FSM needs to execute different procedures for the same action in different states, this approach will not scale well.

## Transducers

If the FSM also produce outputs, it is often called a **transducer**. 

Two types of [transducers](https://en.wikipedia.org/wiki/Finite-state_transducer) are [Moore machines](https://en.wikipedia.org/wiki/Moore_machine) and [Mealy machines](https://en.wikipedia.org/wiki/Mealy_machine). For the former, the output is determined by the current state, while for the latter the output is determined by current state and current input.

One way to implement transducers in Python is via [coroutines](https://eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines).

Let's study another example:

## The Elevator Problem

Consider the problem of modelling an elevator (this problem is based of David Beazley's Elevated [course](https://www.dabeaz.com/elevated.html)). 

There are several sources of events:

+ push buttons in the building to request the elevator to that floor

+ push buttons inside the elevator, one button per floor

+ opening and closing elevator doors

+ moving to the next floor

The elevator has, at least, three states

+ idling, i.e., doing nothing

+ moving to the next destination

+ loading/unloading passagers

There is little information about the outside system. So we should simulate it in a way that makes it easy to refactor our code when we know more.

### Defining the types

To deal with this problem, let's first define some appropriate types:

In [None]:
# A time model, that states the amounts of time to deal with different actions
class Time:
  TO_OPEN  = 5   # to open doors
  TO_CLOSE = 10  # to close doors
  TO_MOVE  = 20  # to move to next floor

An event type which includes:

+ time - at what time the event occurs
+ type - the type of event (check EventType below)
+ data - dictionary with relevant event information

We will keep it simple, and wrap it with a named tuple,

In [None]:
from collections import namedtuple

Event = namedtuple('Event', 'time type data')

We also need to model the types of event. For that we will use an enumeration,

In [None]:
from enum import Enum

class EventType(Enum):
  OPEN, CLOSE, IN_BUTTON, OUT_BUTTON, NEXT_FLOOR = range(5)
  
  def __lt__(self, other):
    """ make enum elements comparable
        https://stackoverflow.com/a/71839532/3744730 """
    if self == other:
      return False
    for elem in EventType: # follows elements definition order
      if self == elem:
        return True
      if other == elem:
        return False

Since each event can have multiple kinds of information, we will wrap them into a dictionary. And, as in the event types, we need to make them comparables (that has to do with the event time simulation -- to be implemented),

In [None]:
from collections import UserDict

class Tags(UserDict):
  """ comparable dictionaries """
  def __lt__(self, other):
    if 'id' not in self.keys():
      return True
    elif 'id' not in other.keys():
      return False
    else:
      return self['id'] < other['id']

Before we advance, let's do some logging configuration,

In [None]:
import logging as log

handler = log.FileHandler('app.log', 'w', encoding='utf-8')
log.basicConfig(level=log.INFO, handlers=(handler,), format='%(message)s')

### The Elevator class

The `Elevator` class define elevator objects. 

An elevator is able to receive events, via the `dispatch` method, and process them by (eventually) changing its state, and sending events to the scheduler.

The scheduler is provided, by the system, via dependency injection. The only assumption we make is that of its type: a priority queue of events. The goal here is to limit the dependencies with the (yet unknown) system.

An elevator is modelled as a FSM. The way this class solves the problem of implementing a FSM is via the `_state` attribute, which represents the current state information as a function (either `IDLING`, `MOVING`, `LOADING`). 

The `dispatch` method just executes the appropriate method, passing the incoming event.

This way, each state is separated by a different method, reducing the amount of conditionals needed to implement the class.

note: if the number of states and events where larger, we could even split these methods by (state,event) pairs, like methods `IDLING_OPEN` or `LOADING_CLOSE`.

In [None]:
class Elevator:
  def __init__(self, elevator_id, scheduler, min_floor=1, max_floor=5):
    self._scheduler = scheduler # System's scheduler (to add events if needed)
    self._elv_id    = elevator_id
    self._min_floor = min_floor
    self._max_floor = max_floor
    self._floor     = min_floor                # current floor
    self._open_door = False                    # elevator doors are open?
    self._up        = True                     # is the elevator going up?
    self._goals     = []                       # which floors must visit
    self._tags      = Tags({'id':elevator_id}) # tags to append to events
    self.set_state(self.IDLING)


  def dispatch(self, event):
    """ send event to current state """
    self._state(event)


  def IDLING(self, event):
    time, event_type, data = event
    
    if event_type == EventType.OPEN:
      if not self._open_door:
        self._open_door = True
        close_event = Event(time+Time.TO_CLOSE, EventType.CLOSE, self._tags)
        self._scheduler.put(close_event)   
      else:
        log.info('     (dev/nulled OPEN event: doors already opened)')
  
    elif event_type == EventType.CLOSE:
      if self._open_door:
        self._open_door = False
        if self.goals:
          move_event = Event(time+Time.TO_MOVE, EventType.NEXT_FLOOR, self._tags)
          self._scheduler.put(move_event)
          self.set_state(self.MOVING)
      else:
        log.info('     (dev/nulled CLOSE event: doors already closed)')
          
    elif event_type == EventType.NEXT_FLOOR:
      if self.goals:
        if self._open_door: # if doors are open, close them first
          close_event = Event(time, EventType.CLOSE, self._tags)
          self._scheduler.put(close_event)
          # time += Time.TO_CLOSE
        else:
          self.set_state(self.MOVING)
          move_event = Event(time, EventType.NEXT_FLOOR, data)
          self.dispatch(move_event)


  def MOVING(self, event):
    assert not self._open_door, "ERROR: Elevator is moving with open doors!!"
    time, event_type, data = event
    
    if event_type == EventType.NEXT_FLOOR:
      self.next_floor()
      if self._floor in self._goals:
        # we need to stop on this floor
        self._goals.remove(self._floor)
        open_event = Event(time+Time.TO_OPEN, EventType.OPEN, self._tags)
        self._scheduler.put(open_event)
        self.set_state(self.LOADING)
      elif not self._goals:
        # there's no goals left => change elevator state to idle
        self.set_state(self.IDLING)
      else:
        # otherwise, keep walking
        move_event = Event(time+Time.TO_MOVE, EventType.NEXT_FLOOR, self._tags)
        self._scheduler.put(move_event)
        
    elif event_type == EventType.OPEN:
       log.info('     (WARNING: dev/nulled OPEN event occurred while moving)')

    elif event_type == EventType.CLOSE:
       log.info('     (WARNING: dev/nulled CLOSE event occurred while moving)')


  def LOADING(self, event):
    time, event_type, data = event
  
    if event_type == EventType.OPEN:
      self._open_door = True
      close_event = Event(time+Time.TO_CLOSE, EventType.CLOSE, self._tags)
      self._scheduler.put(close_event)
  
    elif event_type == EventType.CLOSE:
      self._open_door = False
      if self.goals:
        move_event = Event(time+Time.TO_MOVE, EventType.NEXT_FLOOR, self._tags)
        self._scheduler.put(move_event)
        self.set_state(self.MOVING)
      else: # there's no goals left => change elevator state to idle
        self.set_state(self.IDLING)
        
    elif event_type == EventType.NEXT_FLOOR:
      # LOADING dev/nulls NEXT_FLOOR events (½ persons are a no no)
      log.info('     (dev/nulled NEXT_FLOOR event: loading passagers)')
        
  ### auxiliary methods
  
  def set_state(self, new_state):
    self._state = new_state
    
  def add_goal(self, floor):
    self._goals.append(floor)              # another floor we need to go
    self._goals = sorted(set(self._goals)) # deduplicate & sort

  def next_floor(self):
    if self.up:
      if any(goal > self._floor for goal in self.goals):
        self._floor += 1
      elif self.goals:
        self._up = not self._up        
        self._floor -= 1
    else:
      if any(goal < self._floor for goal in self.goals):
        self._floor -= 1
      elif self.goals:
        self._up = not self._up        
        self._floor += 1
    
  @property
  def goals(self):
    return self._goals
    
  @property
  def floor(self):
    return self._floor
    
  @property
  def up(self):
    return self._up
    
  def __repr__(self):
    arrow = lambda up: '↑' if up else '↓'
    moveto = '->'*(len(self._goals)>0)
    msg = f"{self._floor}{arrow(self._up)} {self._state.__name__:7}{moveto}{str(self._goals)[1:-1]}"
    return f"{msg:18}"  

### The Simulator

Finally, let's simulate an event system. 

The next class receives a list of external events (which we denote as `world`) that generates events that are to be sent to an elevator (which, possibly, will generate more events).

All these events are kept in a priority queue, the scheduler, prioritizing by event time (btw, that's why we made event types and tags comparable, to be able to compare events produced at the same time).

In [None]:
from queue import PriorityQueue

class System:
  def __init__(self, n_elevators=1, min_floor=1, max_floor=5):
    self._scheduler = PriorityQueue()
    self._elevators = {i:Elevator(i, self._scheduler, min_floor, max_floor) 
                       for i in range(n_elevators)}

  
  def run(self, world, end_time=1_000):
    for event in world:          # the world is a list of external events
      self._scheduler.put(event)

    ok = True 
    log.info('time   event      tags        elevator(s) state ↑↓')
    for time in range(end_time):
      while not self._scheduler.empty() and self._scheduler.queue[0].time == time: 
        event = self._scheduler.get()   # get next event with this timestamp       
        log.info(f'{self.repr_event(event)} {self.repr_elevators()}')
        self.process(event)
        # check system's state, after each event processed
        if not self.valid_system_state():
          log.info(f'ERROR: Last system state (t={time}) found invalid! Run away!!')
          log.info(f'{self.repr_event(event)} {self.repr_elevators()}')
          # TODO: instead of return False we could try to recover the state.
          # eg: reset elevator's goals, move to bottom floor, open doors          
          ok = False # ends in error
          break
    return ok
 

  def process(self, event):
    """ either a) convert an external event into 1+ internal events 
               b) dispatch an internal event to its elevator """
    time, event_type, data = event
    
    if event_type == EventType.OUT_BUTTON: 
      # TODO: we need two buttons outside, right now it accepts all requests
      # (btw, this is how my building's elevators work ;-)
      elv_id = self.assign_elevator(data['where'])
      if self._elevators[elv_id].floor == data['where']:
        # the elevator is already there
        elevator_event = Event(time+Time.TO_OPEN, EventType.OPEN, Tags({'id':elv_id}))
      else:
        # otherwise, the elevator need to go there
        self._elevators[elv_id].add_goal(data['where'])
        elevator_event = Event(time+Time.TO_MOVE, EventType.NEXT_FLOOR, Tags({'id':elv_id}))
      self._scheduler.put(elevator_event)
      
    elif event_type == EventType.IN_BUTTON:
      elv_id, destination = data['id'], data['to']
      elevator = self._elevators[elv_id]
      # A button pressed inside the elevator car is ignored if it's
      # not a floor in the current direction of motion.
      if ((elevator.goals == [] and destination != elevator.floor) or
          (destination > elevator.floor and     elevator.up) or 
          (destination < elevator.floor and not elevator.up)):
        elevator.add_goal(destination)
        elevator_event = Event(time+Time.TO_MOVE, EventType.NEXT_FLOOR, Tags({'id':elv_id}))
        self._scheduler.put(elevator_event)

    else:  
      # an internal event: the respective elevator should deal with it
      try:
        self._elevators[data['id']].dispatch(event)
      except:
        print(event)


  def valid_system_state(self):
    """ checks if elevator(s) are in a valid state
        the elevator invariant is:
          1) min_floor <= current_floor <= max_floor
          2) doors_open -> elevator not moving
    """
    for elevator in self._elevators.values():
      if not (elevator._min_floor <= elevator.floor <= elevator._max_floor):
        return False
      if elevator._open_door and elevator._state == elevator.MOVING:
        return False
    return True

  def assign_elevator(self, to_floor):
    """ decide between available elevators (if more than one) """
    return 0 # TODO: only one elevator exists, so this decision is easy!

  def repr_elevators(self):
    return ' |'.join(f" {i}@{elv}" for i, elv in self._elevators.items())
  
  def repr_event(self, event):
    tags = ' '.join(k+':'+str(v) for k,v in event.data.items())
    return f'{event.time:4d} {event.type.name:10} [{tags:10}]'

### Running the program

In [None]:
# by some reason, logger is not working on colab, so let's do it manually
class Log:
  def __init__(self, name='app.log', verbose=True):
    self.f = open(name, 'w', encoding='utf-8')
    self.verbose = verbose

  def info(self, msg):
    self.f.write(msg + '\r\n')
    if self.verbose:
      print(msg)
  
  def close(self):
    self.f.close()

Let's run the system with some external events:

In [None]:
world = [
    Event( 18, EventType.OUT_BUTTON, Tags({'where': 1})),
    Event( 19, EventType.OUT_BUTTON, Tags({'where': 1})),
    Event( 32, EventType.IN_BUTTON,  Tags({'id': 0, 'to': 4})),
    Event( 78, EventType.IN_BUTTON,  Tags({'id': 0, 'to': 5})),
    Event(150, EventType.OUT_BUTTON, Tags({'where': 2})),
    Event(210, EventType.IN_BUTTON,  Tags({'id': 0, 'to': 4})),
    Event(247, EventType.IN_BUTTON,  Tags({'id': 0, 'to': 5})),
  ]    
log = Log()
System().run(world)
log.close()

time   event      tags        elevator(s) state ↑↓
  18 OUT_BUTTON [where:1   ]  0@1↑ IDLING         
  19 OUT_BUTTON [where:1   ]  0@1↑ IDLING         
  23 OPEN       [id:0      ]  0@1↑ IDLING         
  24 OPEN       [id:0      ]  0@1↑ IDLING         
     (dev/nulled OPEN event: doors already opened)
  32 IN_BUTTON  [id:0 to:4 ]  0@1↑ IDLING         
  33 CLOSE      [id:0      ]  0@1↑ IDLING ->4     
  52 NEXT_FLOOR [id:0      ]  0@1↑ MOVING ->4     
  53 NEXT_FLOOR [id:0      ]  0@2↑ MOVING ->4     
  72 NEXT_FLOOR [id:0      ]  0@3↑ MOVING ->4     
  73 NEXT_FLOOR [id:0      ]  0@4↑ LOADING        
     (dev/nulled NEXT_FLOOR event: loading passagers)
  77 OPEN       [id:0      ]  0@4↑ LOADING        
  78 IN_BUTTON  [id:0 to:5 ]  0@4↑ LOADING        
  87 CLOSE      [id:0      ]  0@4↑ LOADING->5     
  98 NEXT_FLOOR [id:0      ]  0@4↑ MOVING ->5     
 103 OPEN       [id:0      ]  0@5↑ LOADING        
 107 NEXT_FLOOR [id:0      ]  0@5↑ LOADING        
     (dev/nulled NEXT_FLOOR 

## Testing FSMs

A hard problem is to test FSMs. How can we properly test the previous elevator implementation?

First approach: try using standard unit tests

### Unit Tests

In [None]:
import unittest

Some auxiliary functions:

In [None]:
import re

def events(log):
  """ returns lists of (event, current floor) from log """
  res = []
  for line in log:
    # get event type and current floor
    res.extend(re.findall(r'\S+ (\S+).*?\]\s+.*?@(\d+).*', line))
  return res

def states(log):
  """ returns lists of (state, goals) from log """
  res = []
  for line in log:
    # get elevator state and current goals
    cmd, ns = re.findall(r'.*?\]\s+\S+ (\w+)\s*?(.*)', line)[0]
    ns = ns.strip().replace('->', '').split(',')  # convert goals to list
    ns = [] if ns == [''] else list(map(int, ns)) # 
    res.append((cmd, ns))
  return res

The next unit test creates a situation -- some button was pushed on a given floor -- runs it and checks the log file:

In [None]:
LOG_FILENAME = 'app2.log'
log = Log(LOG_FILENAME, verbose=False)

class Test_(unittest.TestCase):
  
  def test_floor_up(self):
    """ move the elevator some floors up """
    floor = 3
    world = [
      Event(1, EventType.OUT_BUTTON, Tags({'where': floor})),
    ]    
    System().run(world, end_time=500)
    log.close()
    
    with open(LOG_FILENAME, 'r', encoding="utf-8") as f:
      text = f.readlines()[1:]
      evts = events(text)
      assert [cmd for cmd,_ in evts].count('NEXT_FLOOR') == floor-1
      assert evts[-2][0] == 'OPEN'
      assert evts[-1][0] == 'CLOSE'

In [None]:
unittest.main(argv=[''], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.003s

OK


<unittest.main.TestProgram at 0x7fa155d11880>

However, this approach is not very productive. Possibly, we need to test _lots_ of different situations.

### Testing FSM state paths

One way to test FSMs is to generate tests that pass thru a certain path of states. A limited number of these tests can pass thru all states. 

Another related idea is to test narratives, ie, typical situations that perform typical paths.

In [None]:
LOG_FILENAME = 'app3.log'
log = Log(LOG_FILENAME, verbose=False)

class Test_(unittest.TestCase):
  # testing path: IDLING -> MOVING -> LOADING -> MOVING -> LOADING 
  def test_moving_loading_twice(self):
    first_floor, second_floor = 2, 5
    world = [
      Event( 1, EventType.OUT_BUTTON, Tags({'where': 1})),
      Event(20, EventType.IN_BUTTON, Tags({'id': 0, 'to':first_floor})),
      Event(40, EventType.IN_BUTTON, Tags({'id': 0, 'to':second_floor})),
    ]    
    System().run(world, end_time=500)
    log.close()

    with open(LOG_FILENAME, 'r', encoding="utf-8") as f:
      text = f.readlines()[1:]
      evts = events(text)
      assert sorted(int(floor) for cmd,floor in evts if cmd=='OPEN')  == [1, first_floor, second_floor]
      assert sorted(int(floor) for cmd,floor in evts if cmd=='CLOSE') == [1, first_floor, second_floor]
      sts = states(text)
      assert any(first_floor  in floors for _,floors in sts)
      assert any(second_floor in floors for _,floors in sts)

In [None]:
unittest.main(argv=[''], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.003s

OK


<unittest.main.TestProgram at 0x7fa155ca5100>

### Random walks

An idea, suggested by David Beazley, is to perform a random walk of events. This way the system might be forced to go thru some strange rare paths that could be useful to catch bugs.

In [None]:
import numpy as np
import random

LOG_FILENAME = 'app4.log'
log = Log(LOG_FILENAME, verbose=False)

class Test_(unittest.TestCase):
  def test_random_walk(self):
    n = 100
    seed = np.random.randint(1,1000)
    np.random.seed(seed)

    # create random list of external events
    times = np.random.exponential(scale=15, size=n)
    times = list(map(int, np.cumsum(np.round(times))))
    events = np.random.choice([EventType.OUT_BUTTON, 
                               EventType.IN_BUTTON], size=n)
    tags_list = []
    for event in events:
      if event == EventType.OUT_BUTTON:
        tags_list.append(Tags({'where':random.choice(range(1,6))}))
      if event == EventType.IN_BUTTON:
        tags_list.append(Tags({'id':0, 'to':random.choice(range(1,6))}))
    
    # generate random-walk world
    world = [Event(time, event_type, tags) 
             for time, event_type, tags in zip(times, events, tags_list)]

    # run() returns True if no invalid state was found
    status = System().run(world, end_time=max(times)*10)
    log.close()

    if not status:
      print('** Used seed:', seed)
      print('** List of events:')
      print('\n'.join(str(event) for event in world))
    assert status  

In [None]:
unittest.main(argv=[''], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.020s

OK


<unittest.main.TestProgram at 0x7fa155c75d60>

### Breadth-first search

Another option is to build all possible worlds and run them all, in a BFS style. This has the advantage of being exaustive but it is also quite slow, since the number of worlds grow exponentially with the number of events!

In [None]:
from queue import Queue

log = Log(LOG_FILENAME, verbose=False)
LOG_FILENAME = 'app5.log'

class Test_(unittest.TestCase):
  def test_BFS_walk(self):
    seed_event = Event(1, EventType.OUT_BUTTON, Tags({'where':1}))
    worlds = Queue()
    worlds.put([seed_event])
    
    N = 5000
    for _ in range(N): # check the first N possible worlds
      world = worlds.get()
      last_time = world[-1].time
      status = System().run(world, end_time=last_time*10)
      
      if not status:
        print('** List of events:')
        print('\n'.join(str(event) for event in world))
      assert status
      
      delta_time = 10
      for event_floor in range(1,6,2):
        event_out = Event(last_time + delta_time, 
                          EventType.OUT_BUTTON, 
                          Tags({'where':event_floor}))
        worlds.put(world+[event_out])

        event_in  = Event(last_time + delta_time, 
                          EventType.IN_BUTTON, 
                          Tags({'id':0, 'to':event_floor}))
        worlds.put(world+[event_in])

    log.close()

In [None]:
unittest.main(argv=[''], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 3.991s

OK


<unittest.main.TestProgram at 0x7fa154cafcd0>

### Permutation of external events

Another approach is to permutate sequences of button presses and check if the system runs well for every permutation:

In [None]:
from itertools import combinations

log = Log(LOG_FILENAME, verbose=False)
LOG_FILENAME = 'app6.log'

class Test_(unittest.TestCase):
  # test combinations of button presses (no event at same floor is repeated)        
  def test_combinations(self):
    for n_events in range(1,11):
      for world in get_worlds(n_events):
        last_time = world[-1].time
        status = System().run(world, end_time=last_time*10)
        
        if not status:
          print('** List of events:')
          print('\n'.join(str(event) for event in world))
        assert status

def get_worlds(n):
  worlds = []
  for path in combinations([y for x in range(1,6) for y in [-x,x]], r=n):
    world = []
    for i,floor in enumerate(path):
      if floor > 0:
        event = Event(10*i, EventType.OUT_BUTTON, Tags({'where':floor}))
      else:
        event = Event(10*i, EventType.IN_BUTTON,  Tags({'id':0, 'to':abs(floor)}))      
      world.append(event)
    worlds.append(world)
  return worlds        

In [None]:
unittest.main(argv=[''], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 0.675s

OK


<unittest.main.TestProgram at 0x7fa155ca51f0>