-
Notifications
You must be signed in to change notification settings - Fork 24
/
pysh.py
85 lines (79 loc) · 2.1 KB
/
pysh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
import time
import sys
import threading
import subprocess
import shlex
from pcaspy import Driver, SimpleServer
prefix = 'MTEST:'
pvdb = {
'COMMAND' : {
'type' : 'char',
'count': 128,
'asyn' : True
},
'OUTPUT' : {
'type' : 'char',
'count': 500,
},
'STATUS' : {
'type' : 'enum',
'enums': ['DONE', 'BUSY']
},
'ERROR' : {
'type' : 'string',
},
}
import math
class myDriver(Driver):
def __init__(self):
Driver.__init__(self)
self.tid = None
def write(self, reason, value):
status = True
# take proper actions
if reason == 'COMMAND':
if not self.tid:
command = value
if command:
self.tid = threading.Thread(target=self.runShell,args=(command,))
self.tid.start()
else:
status = False
else:
status = False
# store the values
if status:
self.setParam(reason, value)
return status
def runShell(self, command):
print("DEBUG: Run ", command)
# set status BUSY
self.setParam('STATUS', 1)
self.updatePVs()
# run shell
try:
time.sleep(0.01)
proc = subprocess.Popen(shlex.split(command),
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
proc.wait()
except OSError:
self.setParam('ERROR', str(sys.exc_info()[1]))
self.setParam('OUTPUT', '')
else:
self.setParam('ERROR', proc.stderr.read().rstrip())
self.setParam('OUTPUT', proc.stdout.read().rstrip())
self.callbackPV('COMMAND')
# set status DONE
self.setParam('STATUS', 0)
self.updatePVs()
self.tid = None
print("DEBUG: Finish ", command)
if __name__ == '__main__':
server = SimpleServer()
server.createPV(prefix, pvdb)
driver = myDriver()
while True:
# process CA transactions
server.process(0.1)