forked from goatsofnaxos/VBAcmd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
VBAFSMthread3.py
158 lines (134 loc) · 7.28 KB
/
VBAFSMthread3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
'''
(c) 2017 The Trustees of Columbia University in the City of New York. All Rights Reserved.
Started September 24, 2015
@author: Carl Schoonover
Finite state machine for virtual burrow assay v.3+
'''
from __future__ import division
from threading import Thread
from numpy import random
from time import sleep
from transitions import Machine
class VBAFSMthread(Thread):
def __init__(self):
# INPUT VALUES SET BY CMD
self.doPrint = 1 # Option to print current state of FSM
self.rt = 0.5 # Update rates (default is 0.5 sec; set by CMD)
self.CMDhaltLoopSignal = 0 # Exit run loop
self.CMDrestartSignal = 0 # Return to waiting4start state
self.CMDolfactometerSaysPull = [True, True] # Up when olfactometer is not in [odor delivery + behavior] epochs
self.CMDdoneStruggling = 0 # Up when animal no longer struggling/escaping
self.CMDanimalEscaped = 0 # Up when animal escaped during slackened state
# OUTPUT VALUES READ BY CMD
self.CMDtriggerHigh = 0
# FSM parameters
self.states = ['waiting4start', 'pullingBack', 'slackening', 'ready', 'launching']
self.transitions = [
{'trigger': 'startOver', 'source': '*', 'dest': 'waiting4start', 'after': 'do_waiting4start'},
{'trigger': 'beginPull', 'source': 'waiting4start', 'dest': 'pullingBack', 'after': 'do_pullingBack'},
{'trigger': 'slacken', 'source': 'pullingBack', 'dest': 'slackening', 'after': 'do_slackening', 'conditions': 'not_struggling_anymore'},
{'trigger': 'catchOrReady', 'source': 'slackening', 'dest': 'pullingBack', 'after': 'do_pullingBack', 'conditions': 'animal_escaped'},
{'trigger': 'catchOrReady', 'source': 'slackening', 'dest': 'ready', 'after': 'do_ready', 'conditions': 'animal_ready'},
{'trigger': 'catchOrLaunch', 'source': 'ready', 'dest': 'launching', 'after': 'do_launching', 'conditions': 'olf_says_launch'},
{'trigger': 'catchOrLaunch', 'source': 'ready', 'dest': 'pullingBack', 'after': 'do_pullingBack', 'conditions': 'animal_escaped'},
{'trigger': 'trialDone', 'source': 'launching', 'dest': 'waiting4start', 'after': 'do_waiting4start', 'conditions': 'olf_says_trial_over'},
]
# Initiate FSM
Machine(model=self, states=self.states, transitions=self.transitions, initial='waiting4start')
# Constructor for threading
Thread.__init__(self)
def run(self):
self.startOver()
while True:
if self.CMDhaltLoopSignal: self.halt_loop(); return
elif self.CMDrestartSignal:
self.reset_to_waiting4start()
elif self.state is 'waiting4start':
while True and (self.state is 'waiting4start'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.beginPull()
elif self.state is 'pullingBack':
while True and (self.state is 'pullingBack'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.slacken()
elif self.state is 'slackening':
while True and (self.state is 'slackening'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.catchOrReady()
elif self.state is 'ready':
while True and (self.state is 'ready'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.catchOrLaunch()
elif self.state is 'launching':
while True and (self.state is 'launching'):
if self.CMDrestartSignal: self.reset_to_waiting4start(); break
if self.CMDhaltLoopSignal: self.halt_loop(); return
self.trialDone()
else: pass
def reset_to_waiting4start(self):
self.CMDrestartSignal = 0
self.CMDtriggerHigh = 0
self.startOver()
def halt_loop(self):
self.CMDtriggerHigh = 0
self.reset_to_waiting4start()
def do_waiting4start(self):
self.CMDtriggerHigh = 0
if self.doPrint: print self.state
sleep(self.rt*4)
def do_pullingBack(self):
self.CMDtriggerHigh = 0
if self.doPrint: print self.state
sleep(self.rt*4)
def do_slackening(self):
self.CMDtriggerHigh = 0
if self.doPrint: print self.state
sleep(self.rt*4)
def do_ready(self):
self.CMDtriggerHigh = 1
if self.doPrint: print self.state
sleep(self.rt*4)
def do_launching(self):
self.CMDtriggerHigh = 0
if self.doPrint: print self.state
sleep(self.rt*4)
def not_struggling_anymore(self):
sleep(self.rt)
if self.doAutoCycle: condition = random.random_sample() > 0.8
else: condition = self.CMDdoneStruggling # CONDITIONS: (1) Servo done pulling; (2) Animal hasn't generated force for N seconds
if self.doPrint: print '## ANIMAL DONE STRUGGLING? ##', condition
return condition
def animal_escaped(self):
sleep(self.rt/2)
if self.doAutoCycle: condition = random.random_sample() > 0.8
else: condition = self.CMDanimalEscaped # CONDITIONS: (1) Laser says animal moved
if self.doPrint: print '## ANIMAL ESCAPED? ##', condition
return condition
def animal_ready(self):
sleep(self.rt/2)
if self.doAutoCycle: condition = random.random_sample() > 0.8
else: condition = self.CMDanimalReady # CONDITIONS: (1) Servo hasn't moved for N seconds; (2) Animal hasn't crossed laser threshold for N seconds; (3) Animal hasn't moved more than Laser SD theta for N seconds; (4) Animal hasn't crossed force threshold for N seconds
if self.doPrint: print '## ANIMAL READY? ##', condition
return condition
def olf_says_launch(self):
sleep(self.rt/2)
if self.doAutoCycle: condition = random.random_sample() > 0.8
else: condition = self.CMDolfactometerSaysPull[1] == False # CONDITIONS: (1) Olfactometer not telling VBA to pull anymore (i.e. = 0) because odor presentation has begun
if self.doPrint: print '## OLFACTOMETER TOLD VBA TO STOP PULLING? ##', condition
return condition
def olf_says_trial_over(self):
sleep(self.rt)
if self.doAutoCycle: condition = True; sleep(2)
else: condition = self.CMDolfactometerSaysPull[1] == True # CONDITIONS: (1) Olfactometer telling VBA to start pulling again (i.e. = 1) because trial is over
if self.doPrint: print '## OLFACTOMETER TOLD VBA THAT TRIAL IS DONE? ##', condition
return condition
if __name__ == '__main__':
vbafsm = VBAFSMthread()
vbafsm.doPrint = 1
vbafsm.doAutoCycle = 1
vbafsm.start()
vbafsm.CMDolfactometerSaysPull[1] = True