forked from AmbaPant/mantid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
LoadEXED.py
232 lines (206 loc) · 10.6 KB
/
LoadEXED.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
from mantid.kernel import Direction
from mantid.api import (PythonAlgorithm, AlgorithmFactory, FileAction,
FileProperty, WorkspaceProperty, MatrixWorkspaceProperty,
PropertyMode, WorkspaceFactory, AnalysisDataService)
from mantid.simpleapi import LoadInstrument, AddSampleLogMultiple, SetGoniometer
from mantid.simpleapi import ExtractSpectra, MaskDetectors, RemoveMaskedSpectra
from mantid.simpleapi import RotateInstrumentComponent
import struct
import numpy as np
import copy
import types
# Unicode type for both python2 and 3
try:
UnicodeType = types.UnicodeType
except AttributeError:
UnicodeType = str
class LoadEXED(PythonAlgorithm):
__doc__ = """This algorithm reads Exed raw files and creates two workspaces.
One for the detectors and another for the monitor.
"""
def category(self):
return "Inelastic;Diffraction"
def name(self):
return "LoadEXED"
def version(self):
return 1
def summary(self):
return """This algorithm reads Exed raw files and creates two workspaces.
One for the detectors and another for the monitor."""
def PyInit(self):
self.declareProperty(FileProperty(name="Filename", defaultValue="",
action=FileAction.Load,
extensions=["raw"]),
doc="Data file produced by egraph.")
self.declareProperty(FileProperty(name="InstrumentXML", defaultValue="",
action=FileAction.OptionalLoad,
extensions=["xml"]),
doc="Instrument definition file. If no file is specified, the default idf is used.")
self.declareProperty('AngleOverride', -255.0,
doc="Rotation angle (degrees) of the EXED magnet."
"\nThis should be read from the data file!\n"
"Only change the value if the file has an incomplete header!")
self.declareProperty(MatrixWorkspaceProperty(name="OutputWorkspace",
defaultValue="",
direction=Direction.Output),
doc="Mantid workspace containing the measured data.")
self.declareProperty(WorkspaceProperty(name="OutputMonitorWorkspace",
defaultValue="",
direction=Direction.Output,
optional=PropertyMode.Optional),
doc="Workspace containing the measured monitor spectra.")
def PyExec(self):
fn = self.getPropertyValue("Filename")
wsn = self.getPropertyValue("OutputWorkspace")
monitor_workspace_name = self.getPropertyValue("OutputMonitorWorkspace")
if monitor_workspace_name == "":
self.setPropertyValue("OutputMonitorWorkspace", wsn + '_Monitors')
# print (fn, wsn)
self.override_angle = self.getPropertyValue("AngleOverride")
self.fxml = self.getPropertyValue("InstrumentXML")
# load data
parms_dict, det_udet, det_count, det_tbc, data = self.read_file(fn)
nrows = int(parms_dict['NDET'])
# nbins=int(parms_dict['NTC'])
xdata = np.array(det_tbc)
xdata_mon = np.linspace(xdata[0], xdata[-1], len(xdata))
ydata = data.astype(np.float)
ydata = ydata.reshape(nrows, -1)
edata = np.sqrt(ydata)
# CreateWorkspace(OutputWorkspace=wsn,DataX=xdata,DataY=ydata,DataE=edata,
# NSpec=nrows,UnitX='TOF',WorkspaceTitle='Data',YUnitLabel='Counts')
nr, nc = ydata.shape
ws = WorkspaceFactory.create("Workspace2D", NVectors=nr,
XLength=nc + 1, YLength=nc)
for i in range(nrows):
ws.setX(i, xdata)
ws.setY(i, ydata[i])
ws.setE(i, edata[i])
ws.getAxis(0).setUnit('tof')
AnalysisDataService.addOrReplace(wsn, ws)
# self.setProperty("OutputWorkspace", wsn)
# print ("ws:", wsn)
# ws=mtd[wsn]
# fix the x values for the monitor
for i in range(nrows - 2, nrows):
ws.setX(i, xdata_mon)
self.log().information("set detector IDs")
# set detetector IDs
for i in range(nrows):
ws.getSpectrum(i).setDetectorID(det_udet[i])
# Sample_logs the header values are written into the sample logs
log_names = [str(sl.encode('ascii', 'ignore').decode()) for sl in parms_dict.keys()]
log_values = [str(sl.encode('ascii', 'ignore').decode()) if isinstance(sl, UnicodeType) else str(sl) for sl in
parms_dict.values()]
for i in range(len(log_values)):
if ('nan' in log_values[i]) or ('NaN' in log_values[i]):
log_values[i] = '-1.0'
AddSampleLogMultiple(Workspace=wsn, LogNames=log_names, LogValues=log_values)
SetGoniometer(Workspace=wsn, Goniometers='Universal')
if (self.fxml == ""):
LoadInstrument(Workspace=wsn, InstrumentName="Exed", RewriteSpectraMap=True)
else:
LoadInstrument(Workspace=wsn, Filename=self.fxml, RewriteSpectraMap=True)
try:
RotateInstrumentComponent(Workspace=wsn,
ComponentName='Tank',
Y=1,
Angle=-float(parms_dict['phi'].encode('ascii', 'ignore')),
RelativeRotation=False)
except:
self.log().warning("The instrument does not contain a 'Tank' component. "
"This means that you are using a custom XML instrument definition. "
"OMEGA_MAG will be ignored.")
self.log().warning("Please make sure that the detector positions in the instrument definition are correct.")
# Separate monitors into seperate workspace
__temp_monitors = ExtractSpectra(InputWorkspace=wsn,
WorkspaceIndexList=','.join([str(s) for s in range(nrows - 2, nrows)]),
OutputWorkspace=self.getPropertyValue("OutputMonitorWorkspace"))
# ExtractSpectra(InputWorkspace = wsn, WorkspaceIndexList = ','.join([str(s) for s in range(nrows-2, nrows)]),
# OutputWorkspace = wsn + '_Monitors')
MaskDetectors(Workspace=wsn, WorkspaceIndexList=','.join([str(s) for s in range(nrows - 2, nrows)]))
RemoveMaskedSpectra(InputWorkspace=wsn, OutputWorkspace=wsn)
self.setProperty("OutputWorkspace", wsn)
self.setProperty("OutputMonitorWorkspace", __temp_monitors)
def read_file(self, fn):
"""
function to read header and return a dictionary of the parameters parms_dict
the read the data and return it in det_udet, det_count, det_tbc, and data
"""
fin = open(fn, 'rb')
header = True
parms_dict = {}
parms_dict['omega'] = '0.0'
parms_dict['chi'] = '0.0'
# parms_dict['ALL_OMEGA_MAG'] = []
while (header):
b_line = fin.readline()
line = b_line.decode('ascii')
if '=' in line:
line_lst = line.split('=')
if "omment" in line_lst[0]:
parms_dict['Comment'] = line.strip('Comment =')
# if "OMEGA_MAG" in line_lst[0]:
# parms_dict['ALL_OMEGA_MAG'].append(line_lst[1].strip())
# parms_dict[line_lst[0].strip()]=parm_val
elif len(line_lst) > 1:
parm_val = line_lst[1].strip()
if (parm_val.isdigit()) & (line_lst[0].find('Run_Number') < 0):
parm_val = eval(parm_val)
parms_dict[line_lst[0].strip()] = parm_val
if line.find("Following are binary data") > 0:
header = False
while line.find("DATA=1") < 0:
b_line = fin.readline()
line = b_line.decode('ascii')
# print line
nrows = int(parms_dict['NDET'])
nbins = int(parms_dict['NTC'])
self.log().information("read UDET")
# print ("read UDET")
det_udet = self.struct_data_read(fin, nrows)
self.log().information("read Counter")
det_count = self.struct_data_read(fin, nrows)
self.log().information("read TimeBinBoundaries")
det_tbc = self.struct_data_read(fin, nbins + 1, 'f')
self.log().information("read Data")
data = np.fromfile(fin, np.uint32, nrows * nbins, '')
fin.close()
# print(parms_dict)
# parms_dict['ALL_OMEGA_MAG'] = ','.join(parms_dict['ALL_OMEGA_MAG'])
# print('DEBUG: CAR_OMEGA_MAG is ' + parms_dict['ALL_OMEGA_MAG'])
if float(self.override_angle) < -254.0:
try:
parms_dict['phi'] = str(copy.deepcopy(parms_dict['CAR_OMEGA_MAG']))
except KeyError:
try:
parms_dict['phi'] = str(copy.deepcopy(parms_dict['OMEGA_MAG']))
except KeyError:
self.log().warning("OMEGA_MAG value not found! The detector position will be incorrect!")
else:
self.log().information("OMEGA_MAG read as " + str(parms_dict['phi']))
else:
parms_dict['phi'] = str(self.override_angle)
self.log().warning(
"OMEGA_MAG taken from user input, not from the file. Current value: " + str(parms_dict['phi']))
return parms_dict, det_udet, det_count, det_tbc, data
def struct_data_read(self, fin, nrows, data_type='i', byte_size=4):
"""
helper function to read binary data_type
requires the file handle, number of rows and data_type
"""
self.log().debug("nrows %d" % nrows)
tmp_lst = [struct.unpack(data_type, fin.read(byte_size))[0] for i in range(nrows)]
# for i in range(nrows):
# data = struct.unpack(data_type,fin.read(byte_size))[0]
# tmp_lst.append(data)
# print(tmp_lst)
return tmp_lst
# Register algorthm with Mantid.
AlgorithmFactory.subscribe(LoadEXED)