forked from AmbaPant/mantid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
GetLiveInstrumentValue.py
91 lines (75 loc) · 3.63 KB
/
GetLiveInstrumentValue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
from mantid.simpleapi import *
from mantid.kernel import *
from mantid.api import *
class GetLiveInstrumentValue(DataProcessorAlgorithm):
def category(self):
return 'Utility'
def summary(self):
return 'Get a live data value from and instrument via EPICS'
def seeAlso(self):
return [ "ReflectometryReductionOneLiveData" ]
def PyInit(self):
instruments = sorted([item.name()
for item in config.getFacility().instruments()])
instrument = config.getInstrument() if config.getInstrument() in instruments else ''
self.declareProperty(name='Instrument', defaultValue=instrument, direction=Direction.Input,
validator=StringListValidator(instruments),
doc='Instrument to find live value for.')
self.declareProperty(name='PropertyType', defaultValue='Run', direction=Direction.Input,
validator=StringListValidator(['Run', 'Block']),
doc='The type of property to find')
self.declareProperty(name='PropertyName', defaultValue='TITLE', direction=Direction.Input,
validator=StringMandatoryValidator(),
doc='Name of value to find.')
self.declareProperty(name='Value', defaultValue='', direction=Direction.Output,
doc='The live value from the instrument, or an empty string if not found')
def PyExec(self):
self._instrument = self.getProperty('Instrument').value
self._propertyType = self.getProperty('PropertyType').value
self._propertyName = self.getProperty('PropertyName').value
value = self._get_live_value()
self._set_output_value(value)
def _prefix(self):
"""Prefix to use at the start of the EPICS string"""
return 'IN:'
def _name_prefix(self):
"""Prefix to use in the EPICS string before the property name"""
if self._propertyType == 'Run':
return ':DAE:'
else:
return ':CS:SB:'
@staticmethod
def _caget(pvname, as_string=False):
"""Retrieve an EPICS PV value"""
try:
from CaChannel import CaChannel, CaChannelException, ca
except ImportError:
raise RuntimeError("CaChannel must be installed to use this algorithm. "
"For details, see https://www.mantidproject.org/CaChannel_In_Mantid")
if as_string:
dbr_type = ca.DBR_STRING
else:
dbr_type = None
try:
chan = CaChannel(pvname)
chan.setTimeout(5.0)
chan.searchw()
return chan.getw(dbr_type)
except CaChannelException as e:
raise RuntimeError("Error reading EPICS PV \"{}\": {}".format(pvname, str(e)))
def _get_live_value(self):
epicsName = self._prefix() + self._instrument + self._name_prefix() + self._propertyName
return self._caget(epicsName, as_string=True)
def _set_output_value(self, value):
if value is not None:
self.log().notice(self._propertyName + ' = ' + value)
self.setProperty('Value', str(value))
else:
self.log().notice(self._propertyName + ' not found')
AlgorithmFactory.subscribe(GetLiveInstrumentValue)