forked from goatsofnaxos/VBAcmd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
VBAFSMthread2.py
131 lines (108 loc) · 5.74 KB
/
VBAFSMthread2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
'''
(c) 2017 The Trustees of Columbia University in the City of New York. All Rights Reserved.
Started September 24, 2015
@author: Carl Schoonover
Finite state machine for virtual burrow assay v.3+
'''
from __future__ import division
from threading import Thread
from numpy import random
from time import sleep
from transitions import Machine
class VBAFSMthread(Thread):
def __init__(self):
# INPUT VALUES SET BY CMD
self.doPrint = 1 # Option to print current state of FSM
self.rt = 0.5 # Update rates (default is 0.5 sec; set by CMD)
self.CMDhaltLoopSignal = 0 # Exit run loop
self.CMDrestartSignal = 0 # Return to waiting4start state
self.CMDolfactometerStartSignal = 0 # Up when olfactometer initiates trial, down at end
self.CMDdoneStruggling = 0 # Up when animal no longer struggling/escaping
self.CMDanimalEscaped = 0 # Up when animal escaped during slackened state
self.CMDreadyToLaunch = 0 # Up when animal has been extruded without moving for long enough
# FSM parameters
self.states = ['waiting4start', 'pullingBack', 'slackening', 'launching']
self.transitions = [
{'trigger': 'startOver', 'source': '*', 'dest': 'waiting4start', 'after': 'do_waiting4start'},
{'trigger': 'beginPull', 'source': 'waiting4start', 'dest': 'pullingBack', 'after': 'do_pullingBack', 'conditions': 'olfactometer_says_go'},
{'trigger': 'slacken', 'source': 'pullingBack', 'dest': 'slackening', 'after': 'do_slackening', 'conditions': 'not_struggling_anymore'},
{'trigger': 'catchorlaunch', 'source': 'slackening', 'dest': 'pullingBack', 'after': 'do_pullingBack', 'conditions': 'animal_escaped'},
{'trigger': 'catchorlaunch', 'source': 'slackening', 'dest': 'launching', 'after': 'do_launching', 'conditions': 'ok_to_launch'},
{'trigger': 'doneWithTrial', 'source': 'launching', 'dest': 'waiting4start', 'conditions': 'ok_to_wrap_up'},
]
# Initiate FSM
Machine(model=self, states=self.states, transitions=self.transitions, initial='waiting4start')
# Constructor for threading
Thread.__init__(self)
def run(self):
self.startOver()
while True:
if self.CMDhaltLoopSignal: self.halt_loop(); return
elif self.CMDrestartSignal:
self.reset_to_waiting4start()
elif self.state is 'waiting4start':
while True and (self.state is 'waiting4start'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.beginPull()
elif self.state is 'pullingBack':
while True and (self.state is 'pullingBack'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.slacken()
elif self.state is 'slackening':
while True and (self.state is 'slackening'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.catchorlaunch()
elif self.state is 'launching':
while True and (self.state is 'launching'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.doneWithTrial()
else: pass
def reset_to_waiting4start(self):
self.CMDrestartSignal = 0
self.startOver()
def halt_loop(self):
self.reset_to_waiting4start()
def do_waiting4start(self):
if self.doPrint: print self.state
def do_pullingBack(self):
if self.doPrint: print self.state
def do_slackening(self):
if self.doPrint: print self.state
def do_launching(self):
if self.doPrint: print self.state
def olfactometer_says_go(self): # When self.CMDolfactometerStartSignal is high launch the trial
sleep(self.rt)
if self.doPrint: print '## OLFACTOMETER READY TO START TRIAL? ##', self.CMDolfactometerStartSignal
return self.CMDolfactometerStartSignal > 0
def not_struggling_anymore(self):
sleep(self.rt)
if self.doAutoCycle: condition = random.random_sample() > 0.5
else: condition = self.CMDdoneStruggling > 0
if self.doPrint: print '## ANIMAL DONE STRUGGLING? ##', condition
return condition
def animal_escaped(self):
sleep(self.rt/2)
if self.doAutoCycle: condition = random.random_sample() > 0.8
else: condition = self.CMDanimalEscaped > 0
if self.doPrint: print '## ANIMAL ESCAPED? ##', condition
return condition
def ok_to_launch(self):
sleep(self.rt/2)
if self.doAutoCycle: condition = random.random_sample() > 0.95
else: condition = self.CMDreadyToLaunch > 0
if self.doPrint: '## READY TO LAUNCH? ##', condition
return condition
def ok_to_wrap_up(self): # When self.CMDolfactometerStartSignal is low terminate the trial
if self.doPrint: print '## IT AINT OVER TILL THE OLFACTOMETER SAYS ITS OVER ##', not self.CMDolfactometerStartSignal
sleep(self.rt)
return not self.CMDolfactometerStartSignal
if __name__ == '__main__':
vbafsm = VBAFSMthread()
vbafsm.doPrint = 1
vbafsm.doAutoCycle = 1
vbafsm.start()
vbafsm.CMDolfactometerStartSignal = 1