-
Notifications
You must be signed in to change notification settings - Fork 122
/
ReduceSCD_Parallel.py
267 lines (229 loc) · 10.9 KB
/
ReduceSCD_Parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File: ReduceSCD_Parallel.py
#
# Version 2.0, modified to work with Mantid's new python interface.
#
# This script will run multiple instances of the script ReduceSCD_OneRun.py
# in parallel, using either local processes or a slurm partition. After
# using the ReduceSCD_OneRun script to find, index and integrate peaks from
# multiple runs, this script merges the integrated peaks files and re-indexes
# them in a consistent way. If desired, the indexing can also be changed to a
# specified conventional cell.
# Many intermediate files are generated and saved, so all output is written
# to a specified output_directory. This output directory must be created
# before running this script, and must be specified in the configuration file.
# The user should first make sure that all parameters are set properly in
# the configuration file for the ReduceSCD_OneRun.py script, and that that
# script will properly reduce one scd run. Once a single run can be properly
# reduced, set the additional parameters in the configuration file that specify
# how the the list of runs will be processed in parallel.
#
#
# _v1: December 3rd 2013. Mads Joergensen
# This version now includes the posibility to use the 1D cylindrical integration method
# and the posibility to load a UB matrix which will be used for integration of the individual
# runs and to index the combined file (Code from Xiapoing).
#
#
# _v2: December 3rd 2013. Mads Joergensen
# Adds the posibility to optimize the loaded UB for each run for a better peak prediction
# It is also possible to find the common UB by using lattice parameters of the first
# run or the loaded matirix instead of the default FFT method
#
import os
import sys
import threading
import time
import ReduceDictionary
#sys.path.append("/opt/mantidnightly/bin")
sys.path.append("/opt/Mantid/bin")
from mantid.simpleapi import *
print "API Version"
print apiVersion()
start_time = time.time()
# -------------------------------------------------------------------------
# ProcessThread is a simple local class. Each instance of ProcessThread is
# a thread that starts a command line process to reduce one run.
#
class ProcessThread ( threading.Thread ):
command = ""
def setCommand( self, command="" ):
self.command = command
def run ( self ):
print 'STARTING PROCESS: ' + self.command
os.system( self.command )
# -------------------------------------------------------------------------
#
# Get the config file name from the command line
#
if (len(sys.argv) < 2):
print "You MUST give the config file name on the command line"
exit(0)
config_file_name = sys.argv[1]
#
# Load the parameter names and values from the specified configuration file
# into a dictionary and set all the required parameters from the dictionary.
#
params_dictionary = ReduceDictionary.LoadDictionary( config_file_name )
exp_name = params_dictionary[ "exp_name" ]
output_directory = params_dictionary[ "output_directory" ]
reduce_one_run_script = params_dictionary[ "reduce_one_run_script" ]
slurm_queue_name = params_dictionary[ "slurm_queue_name" ]
max_processes = int(params_dictionary[ "max_processes" ])
min_d = params_dictionary[ "min_d" ]
max_d = params_dictionary[ "max_d" ]
tolerance = params_dictionary[ "tolerance" ]
cell_type = params_dictionary[ "cell_type" ]
centering = params_dictionary[ "centering" ]
run_nums = params_dictionary[ "run_nums" ]
use_cylindrical_integration = params_dictionary[ "use_cylindrical_integration" ]
instrument_name = params_dictionary[ "instrument_name" ]
read_UB = params_dictionary[ "read_UB" ]
UB_filename = params_dictionary[ "UB_filename" ]
UseFirstLattice = params_dictionary[ "UseFirstLattice" ]
num_peaks_to_find = params_dictionary[ "num_peaks_to_find" ]
# determine what python executable to launch new jobs with
python = sys.executable
if python is None: # not all platforms define this variable
python = 'python'
#
# Make the list of separate process commands. If a slurm queue name
# was specified, run the processes using slurm, otherwise just use
# multiple processes on the local machine.
#
list=[]
index = 0
for r_num in run_nums:
list.append( ProcessThread() )
cmd = '%s %s %s %s' % (python, reduce_one_run_script, config_file_name, str(r_num))
if slurm_queue_name is not None:
console_file = output_directory + "/" + str(r_num) + "_output.txt"
cmd = 'srun -p ' + slurm_queue_name + \
' --cpus-per-task=3 -J ReduceSCD_Parallel.py -o ' + console_file + ' ' + cmd
list[index].setCommand( cmd )
index = index + 1
#
# Now create and start a thread for each command to run the commands in parallel,
# starting up to max_processes simultaneously.
#
all_done = False
active_list=[]
while not all_done:
if ( len(list) > 0 and len(active_list) < max_processes ):
thread = list[0]
list.remove(thread)
active_list.append( thread )
thread.start()
time.sleep(2)
for thread in active_list:
if not thread.isAlive():
active_list.remove( thread )
if len(list) == 0 and len(active_list) == 0 :
all_done = True
print "\n**************************************************************************************"
print "************** Completed Individual Runs, Starting to Combine Results ****************"
print "**************************************************************************************\n"
#
# First combine all of the integrated files, by reading the separate files and
# appending them to a combined output file.
#
niggli_name = output_directory + "/" + exp_name + "_Niggli"
niggli_integrate_file = niggli_name + ".integrate"
niggli_matrix_file = niggli_name + ".mat"
first_time = True
if not use_cylindrical_integration:
for r_num in run_nums:
one_run_file = output_directory + '/' + str(r_num) + '_Niggli.integrate'
peaks_ws = LoadIsawPeaks( Filename=one_run_file )
if first_time:
if UseFirstLattice and not read_UB:
# Find a UB (using FFT) for the first run to use in the FindUBUsingLatticeParameters
FindUBUsingFFT( PeaksWorkspace=peaks_ws, MinD=min_d, MaxD=max_d, Tolerance=tolerance )
uc_a = peaks_ws.sample().getOrientedLattice().a()
uc_b = peaks_ws.sample().getOrientedLattice().b()
uc_c = peaks_ws.sample().getOrientedLattice().c()
uc_alpha = peaks_ws.sample().getOrientedLattice().alpha()
uc_beta = peaks_ws.sample().getOrientedLattice().beta()
uc_gamma = peaks_ws.sample().getOrientedLattice().gamma()
SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file )
first_time = False
else:
SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=True, Filename=niggli_integrate_file )
#
# Load the combined file and re-index all of the peaks together.
# Save them back to the combined Niggli file (Or selcted UB file if in use...)
#
peaks_ws = LoadIsawPeaks( Filename=niggli_integrate_file )
#
# Find a Niggli UB matrix that indexes the peaks in this run
# Load UB instead of Using FFT
#Index peaks using UB from UB of initial orientation run/or combined runs from first iteration of crystal orientation refinement
if read_UB:
LoadIsawUB(InputWorkspace=peaks_ws, Filename=UB_filename)
if UseFirstLattice:
# Find UB using lattice parameters from the specified file
uc_a = peaks_ws.sample().getOrientedLattice().a()
uc_b = peaks_ws.sample().getOrientedLattice().b()
uc_c = peaks_ws.sample().getOrientedLattice().c()
uc_alpha = peaks_ws.sample().getOrientedLattice().alpha()
uc_beta = peaks_ws.sample().getOrientedLattice().beta()
uc_gamma = peaks_ws.sample().getOrientedLattice().gamma()
FindUBUsingLatticeParameters(PeaksWorkspace= peaks_ws,a=uc_a,b=uc_b,c=uc_c,alpha=uc_alpha,beta=uc_beta, gamma=uc_gamma,NumInitial=num_peaks_to_find,Tolerance=tolerance)
#OptimizeCrystalPlacement(PeaksWorkspace=peaks_ws,ModifiedPeaksWorkspace=peaks_ws,FitInfoTable='CrystalPlacement_info',MaxIndexingError=tolerance)
elif UseFirstLattice and not read_UB:
# Find UB using lattice parameters using the FFT results from first run if no UB file is specified
FindUBUsingLatticeParameters(PeaksWorkspace= peaks_ws,a=uc_a,b=uc_b,c=uc_c,alpha=uc_alpha,beta=uc_beta, gamma=uc_gamma,NumInitial=num_peaks_to_find,Tolerance=tolerance)
else:
FindUBUsingFFT( PeaksWorkspace=peaks_ws, MinD=min_d, MaxD=max_d, Tolerance=tolerance )
IndexPeaks( PeaksWorkspace=peaks_ws, Tolerance=tolerance )
SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file )
SaveIsawUB( InputWorkspace=peaks_ws, Filename=niggli_matrix_file )
#
# If requested, also switch to the specified conventional cell and save the
# corresponding matrix and integrate file
#
if not use_cylindrical_integration:
if (not cell_type is None) and (not centering is None) :
conv_name = output_directory + "/" + exp_name + "_" + cell_type + "_" + centering
conventional_integrate_file = conv_name + ".integrate"
conventional_matrix_file = conv_name + ".mat"
SelectCellOfType( PeaksWorkspace=peaks_ws, CellType=cell_type, Centering=centering,
Apply=True, Tolerance=tolerance )
SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=conventional_integrate_file )
SaveIsawUB( InputWorkspace=peaks_ws, Filename=conventional_matrix_file )
if use_cylindrical_integration:
if (not cell_type is None) or (not centering is None):
print "WARNING: Cylindrical profiles are NOT transformed!!!"
# Combine *.profiles files
filename = output_directory + '/' + exp_name + '.profiles'
output = open( filename, 'w' )
# Read and write the first run profile file with header.
r_num = run_nums[0]
filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles'
input = open( filename, 'r' )
file_all_lines = input.read()
output.write(file_all_lines)
input.close()
os.remove(filename)
# Read and write the rest of the runs without the header.
for r_num in run_nums[1:]:
filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles'
input = open(filename, 'r')
for line in input:
if line[0] == '0': break
output.write(line)
for line in input:
output.write(line)
input.close()
os.remove(filename)
# Remove *.integrate file(s) ONLY USED FOR CYLINDRICAL INTEGRATION!
for file in os.listdir(output_directory):
if file.endswith('.integrate'):
os.remove(file)
end_time = time.time()
print "\n**************************************************************************************"
print "****************************** DONE PROCESSING ALL RUNS ******************************"
print "**************************************************************************************\n"
print 'Total time: ' + str(end_time - start_time) + ' sec'
print 'Connfig file: ' + config_file_name
print 'Script file: ' + reduce_one_run_script + '\n'
print