Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rayonix fixes and some mrxv functions #71

Merged
merged 14 commits into from
May 1, 2022
7 changes: 4 additions & 3 deletions btx/diagnostics/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def finalize_powders(self):
self.powders_final['max'] = np.max(powder_max, axis=0)
self.powders_final['avg'] = powder_sum / float(total_n_proc)
self.powders_final['std'] = np.sqrt(powder_sqr / float(total_n_proc) - np.square(self.powders_final['avg']))
for key in self.powders_final.keys():
self.powders_final[key] = assemble_image_stack_batch(self.powders_final[key], self.pixel_index_map)
if self.psi.det_type != 'Rayonix':
for key in self.powders_final.keys():
self.powders_final[key] = assemble_image_stack_batch(self.powders_final[key], self.pixel_index_map)

def save_powders(self, outdir):
"""
Expand All @@ -71,7 +72,7 @@ def save_powders(self, outdir):
path to directory in which to save powders, optional
"""
for key in self.powders_final.keys():
np.save(os.path.join(outdir, f"powder_{key}_r{self.psi.run:04}.npy"), self.powders_final[key])
np.save(os.path.join(outdir, f"r{self.psi.run:04}_{key}.npy"), self.powders_final[key])

def compute_stats(self, img):
"""
Expand Down
39 changes: 32 additions & 7 deletions btx/interfaces/mask_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import numpy as np
import h5py, sys
import h5py
import os
import sys
import glob
from btx.interfaces.psana_interface import *

class MaskInterface:
Expand All @@ -16,19 +19,14 @@ def __init__(self, exp, run, det_type):
def generate_from_psana_run(self, thresholds, n_images=1):
"""
Generate a mask by extracting the first num_images from the indicated run,
thresholding each, and then merging them.
thresholding each, and then merging them. A value of 0 indicates a bad pixel.

Parameters
----------
thresholds : tuple, 2d
(lower, upper) thresholds for pixel value
n_images : int
number of images to threshold

Returns
-------
mask : numpy.ndarray, shape (n_panels, n_pixels_fs, n_pixels_ss)
binary mask, where 0 indicates a bad pixel
"""
# retrieve images from run
images = self.psi.get_images(n_images, assemble=False)
Expand All @@ -43,6 +41,33 @@ def generate_from_psana_run(self, thresholds, n_images=1):
self.mask = mask
else:
self.mask *= mask

def retrieve_from_mrxv(self, mrxv_path='/cds/sw/package/autosfx/mrxv/masks', dataset='/entry_1/data_1/mask'):
"""
Retrieve the latest mask from mrxv.

Parameters
----------
dataset : str
internal path to dataset, only relevant for mask_format='crystfel'
"""
try:
mask_file = glob.glob(os.path.join(mrxv_path, f"{self.det_type}_latest.*"))[0]
assert os.path.isfile(mask_file)
print(f"Retrieving mask file {mask_file}")
except:
sys.exit("Detector type not yet available in mrxv")

if h5py.is_hdf5(mask_file):
print("Crystfel mask detected")
mask_format = 'crystfel'
elif mask_file.split('.')[-1] == 'npy':
mask_format = 'psana'
else:
mask_format = 'cctbx'

self.load_mask(mask_file, mask_format=mask_format, dataset=dataset)
assert self.psi.det.shape() == self.mask.shape

def load_mask(self, input_file, mask_format='crystfel', dataset='/entry_1/data_1/mask'):
"""
Expand Down
27 changes: 27 additions & 0 deletions btx/misc/metrology.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,33 @@
from psgeom import camera, sensors
from btx.interfaces.psana_interface import PsanaInterface

def retrieve_from_mrxv(det_type, out_geom, mrxv_path='/cds/sw/package/autosfx/mrxv/geometries'):
"""
Fetch latest geom file for this detector from mrxv. Currently
assume that the file is in CrystFEL geom format. C

Parameters
----------
det_type : str
detector name
mrxv_path : str
path to mrxv geom folder
"""
try:
in_geom = os.path.join(mrxv_path, f"{det_type}_latest.geom")
print(in_geom)
assert os.path.isfile(in_geom)
print(f"Retrieving geom file {in_geom}")
except:
sys.exit("Detector type not yet available in mrxv")

if det_type == 'epix10k2M':
geom = camera.CompoundAreaCamera.from_crystfel_file(in_geom, element_type=sensors.Epix10kaSegment)
else:
geom = camera.CompoundAreaCamera.from_crystfel_file(in_geom)

geom.to_crystfel_file(out_geom)

def modify_crystfel_header(input_file, output_file):
"""
Modify the header of a psgeom-generated Crystfel (.geom) file,
Expand Down
26 changes: 26 additions & 0 deletions btx/misc/shortcuts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import numpy as np
import os
import glob

class AttrDict(dict):
""" Nested Attribute Dictionary
Expand Down Expand Up @@ -29,3 +31,27 @@ def __getattr__(self, item):
return self.__getitem__(item)
except KeyError:
raise AttributeError(item)

def fetch_latest(fnames, run):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for later: this could potentially go with another function tasked with defining paths to mrxv, etc. based on the location (SLAC, NERSC, ...). Maybe in some path_interface tool, or something

"""
Fetch the most recently created (in terms of run numbers) file.
Here we assume that files are named /{base_path}/r{run:04}.* .

Parameters
----------
fnames : str
glob-expandable string pointing to geom or mask files
run : int
run number

Returns
-------
fname : str
filename of relevant geom or mask file
"""
fnames = glob.glob(fnames)
avail = [os.path.basename(f)[1:].split('.')[0] for f in fnames]
avail = np.array([int(a) for a in avail])
sort_idx = np.argsort(avail)
idx = np.searchsorted(avail[sort_idx], run, side='right') - 1
return fnames[sort_idx[idx]]
10 changes: 6 additions & 4 deletions btx/processing/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Indexer:

""" Index cxi files using indexamajig: https://www.desy.de/~twhite/crystfel/manual-indexamajig.html """

def __init__(self, exp, run, det_type, taskdir, geom, cell, int_rad='4,5,6', methods='mosflm',
def __init__(self, exp, run, det_type, tag, taskdir, geom, cell, int_rad='4,5,6', methods='mosflm',
tolerance='5,5,5,1.5', no_revalidate=True, multi=True, profile=True):

# general paramters
Expand All @@ -23,7 +23,7 @@ def __init__(self, exp, run, det_type, taskdir, geom, cell, int_rad='4,5,6', met
self.no_revalidate = no_revalidate # bool, skip validation step to omit iffy peaks
self.multi = multi # bool, enable multi-lattice indexing
self.profile = profile # bool, display timing data
self._retrieve_paths(taskdir)
self._retrieve_paths(taskdir, tag)
self._parallel_logic()

def _parallel_logic(self):
Expand All @@ -39,7 +39,7 @@ def _parallel_logic(self):
else:
self.rank = 0

def _retrieve_paths(self, taskdir):
def _retrieve_paths(self, taskdir, tag):
"""
Retrieve the paths for the input .lst and output .stream file
consistent with the btx analysis directory structure.
Expand All @@ -48,9 +48,11 @@ def _retrieve_paths(self, taskdir):
----------
taskdir : str
path to output folder for indexing results
tag : str
filename extension suffix
"""
self.lst = os.path.join(taskdir ,f'r{self.run:04}/r{self.run:04}.lst')
self.stream = os.path.join(taskdir, f'r{self.run:04}.stream')
self.stream = os.path.join(taskdir, f'r{self.run:04}_{tag}.stream')
if "TMP_EXE" in os.environ:
self.tmp_exe = os.environ['TMP_EXE']
else:
Expand Down
10 changes: 9 additions & 1 deletion btx/processing/peak_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def set_up_psana_interface(self, exp, run, det_type):
# additional self variables for tracking peak stats
self.iX = self.psi.det.indexes_x(self.psi.run).astype(np.int64)
self.iY = self.psi.det.indexes_y(self.psi.run).astype(np.int64)
if det_type == 'Rayonix':
self.iX = np.expand_dims(self.iX, axis=0)
self.iY = np.expand_dims(self.iY, axis=0)
print(f"self.iX.shape = {self.iX.shape}")

self.ipx, self.ipy = self.psi.det.point_indexes(self.psi.run, pxy_um=(0, 0))

# retrieve clen from psana if None or a PV code is supplied
Expand Down Expand Up @@ -136,7 +141,10 @@ def set_up_cxi(self, tag=''):

# for storing images in crystFEL format
det_shape = self.psi.det.shape()
dim0, dim1 = det_shape[0] * det_shape[1], det_shape[2]
if self.psi.det_type == 'Rayonix':
dim0, dim1 = det_shape[0], det_shape[1]
else:
dim0, dim1 = det_shape[0] * det_shape[1], det_shape[2]
data_1 = entry_1.create_dataset('/entry_1/data_1/data', (self.n_events, dim0, dim1), chunks=(1, dim0, dim1),
maxshape=(None, dim0, dim1),dtype=np.float32)
data_1.attrs["axes"] = "experiment_identifier"
Expand Down
72 changes: 55 additions & 17 deletions scripts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,93 @@ def test(config):
requests.post(update_url, json=[ { "key": "root_dir", "value": f"{config.setup.root_dir}" },
{ "key": "experiment_name", "value": "mfxp19619"} ])

def fetch_mask(config):
from btx.interfaces.mask_interface import MaskInterface
setup = config.setup
task = config.fetch_mask
""" Fetch most recent mask for this detector from mrxv. """
taskdir = os.path.join(setup.root_dir, 'mask')
os.makedirs(taskdir, exist_ok=True)
mi = MaskInterface(exp=setup.exp,
run=setup.run,
det_type=setup.det_type)
mi.retrieve_from_mrxv(dataset=task.dataset)
logger.info(f'Saving mrxv mask to {taskdir}')
mi.save_mask(os.path.join(taskdir, f'r0000.npy'))
logger.debug('Done!')

def fetch_geom(config):
from btx.misc.metrology import retrieve_from_mrxv
setup = config.setup
task = config.fetch_geom
""" Fetch latest geometry for this detector from mrxv. """
taskdir = os.path.join(setup.root_dir, 'geom')
os.makedirs(taskdir, exist_ok=True)
logger.info(f'Saving mrxv geom to {taskdir}')
retrieve_from_mrxv(det_type=setup.det_type, out_geom=os.path.join(taskdir, f'r0000.geom'))
logger.debug('Done!')

def run_analysis(config):
from btx.diagnostics.run import RunDiagnostics
from btx.misc.shortcuts import fetch_latest
setup = config.setup
task = config.run_analysis
""" Generate the max, avg, and std powders for a given run. """
taskdir = os.path.join(setup.root_dir, 'powder')
os.makedirs(taskdir, exist_ok=True)
os.makedirs(os.path.join(taskdir, 'figs'), exist_ok=True)
mask_file = fetch_latest(fnames=os.path.join(setup.root_dir, 'mask', 'r*.npy'), run=setup.run)
logger.debug(f'Applying mask: {mask_file}...')
rd = RunDiagnostics(exp=setup.exp,
run=setup.run,
det_type=setup.det_type)
logger.debug(f'Computing Powder for run {setup.run} of {setup.exp}...')
rd.compute_run_stats(max_events=task.max_events, mask=task.mask)
rd.compute_run_stats(max_events=task.max_events, mask=mask_file)
logger.info(f'Saving Powders and plots to {taskdir}')
rd.save_powders(taskdir)
rd.visualize_powder(output=os.path.join(taskdir, f"powder_r{rd.psi.run:04}.png"))
rd.visualize_stats(output=os.path.join(taskdir, f"stats_r{rd.psi.run:04}.png"))
rd.visualize_powder(output=os.path.join(taskdir, f"figs/powder_r{rd.psi.run:04}.png"))
rd.visualize_stats(output=os.path.join(taskdir, f"figs/stats_r{rd.psi.run:04}.png"))
logger.debug('Done!')

def opt_distance(config):
from btx.diagnostics.geom_opt import GeomOpt
from btx.misc.metrology import modify_crystfel_header, generate_geom_file
from btx.misc.shortcuts import fetch_latest
setup = config.setup
task = config.opt_distance
""" Optimize the detector distance from an AgBehenate run. """
taskdir = os.path.join(setup.root_dir, 'geom')
os.makedirs(taskdir, exist_ok=True)
os.makedirs(os.path.join(taskdir, 'figs'), exist_ok=True)
geom_opt = GeomOpt(exp=setup.exp,
run=setup.run,
det_type=setup.det_type)
task.center = tuple([float(elem) for elem in task.center.split()])
logger.debug(f'Optimizing detector distance for run {setup.run} of {setup.exp}...')
dist = geom_opt.opt_distance(powder=os.path.join(setup.root_dir, f"powder/powder_max_r{setup.run:04}.npy"),
dist = geom_opt.opt_distance(powder=os.path.join(setup.root_dir, f"powder/r{setup.run:04}_max.npy"),
center=task.center,
plot=os.path.join(taskdir, f'r{setup.run:04}.png'))
plot=os.path.join(taskdir, f'figs/r{setup.run:04}.png'))
logger.info(f'Detector distance inferred from powder rings: {dist} mm')
temp_file = os.path.join(taskdir, 'temp.geom')
geom_file = os.path.join(taskdir, f'r{setup.run:04}.geom')
generate_geom_file(setup.exp, setup.run, setup.det_type, task.input_geom, temp_file, det_dist=dist)
modify_crystfel_header(temp_file, geom_file)
os.remove(temp_file)
logger.info(f'CrystFEL geom file saved with updated coffset value to: {geom_file}')
geom_in = fetch_latest(fnames=os.path.join(setup.root_dir, 'geom', 'r*.geom'), run=setup.run)
geom_temp = os.path.join(taskdir, 'temp.geom')
geom_out = os.path.join(taskdir, f'r{setup.run:04}.geom')
generate_geom_file(setup.exp, setup.run, setup.det_type, geom_in, geom_temp, det_dist=dist)
modify_crystfel_header(geom_temp, geom_out)
os.remove(geom_temp)
logger.info(f'CrystFEL geom file saved with updated coffset value to: {geom_out}')
logger.debug('Done!')

def find_peaks(config):
from btx.processing.peak_finder import PeakFinder
from btx.misc.shortcuts import fetch_latest
setup = config.setup
task = config.find_peaks
""" Perform adaptive peak finding on run. """
taskdir = os.path.join(setup.root_dir, 'index')
os.makedirs(taskdir, exist_ok=True)
mask_file = fetch_latest(fnames=os.path.join(setup.root_dir, 'mask', 'r*.npy'), run=setup.run)
pf = PeakFinder(exp=setup.exp, run=setup.run, det_type=setup.det_type, outdir=os.path.join(taskdir ,f"r{setup.run:04}"),
tag=task.tag, mask=task.mask, psana_mask=task.psana_mask, min_peaks=task.min_peaks, max_peaks=task.max_peaks,
tag=task.tag, mask=mask_file, psana_mask=task.psana_mask, min_peaks=task.min_peaks, max_peaks=task.max_peaks,
npix_min=task.npix_min, npix_max=task.npix_max, amax_thr=task.amax_thr, atot_thr=task.atot_thr,
son_min=task.son_min, peak_rank=task.peak_rank, r0=task.r0, dr=task.dr, nsigm=task.nsigm)
logger.debug(f'Performing peak finding for run {setup.run} of {setup.exp}...')
Expand All @@ -81,13 +116,15 @@ def find_peaks(config):

def index(config):
from btx.processing.indexer import Indexer
from btx.misc.shortcuts import fetch_latest
setup = config.setup
task = config.index
""" Index run using indexamajig. """
taskdir = os.path.join(setup.root_dir, 'index')
indexer_obj = Indexer(exp=config.setup.exp, run=config.setup.run, det_type=config.setup.det_type, taskdir=taskdir, geom=task.geom,
cell=task.cell, int_rad=task.int_radius, methods=task.methods, tolerance=task.tolerance,
no_revalidate=task.no_revalidate, multi=task.multi, profile=task.profile)
geom_file = fetch_latest(fnames=os.path.join(setup.root_dir, 'geom', 'r*.geom'), run=setup.run)
indexer_obj = Indexer(exp=config.setup.exp, run=config.setup.run, det_type=config.setup.det_type, tag=task.tag,
taskdir=taskdir, geom=geom_file, cell=task.cell, int_rad=task.int_radius, methods=task.methods,
tolerance=task.tolerance, no_revalidate=task.no_revalidate, multi=task.multi, profile=task.profile)
logger.debug(f'Generating indexing executable for run {setup.run} of {setup.exp}...')
indexer_obj.write_exe()
logger.info(f'Executable written to {indexer_obj.tmp_exe}')
Expand All @@ -98,11 +135,12 @@ def stream_analysis(config):
task = config.stream_analysis
""" Diagnostics including cell distribution and peakogram. """
taskdir = os.path.join(setup.root_dir, 'index')
os.makedirs(os.path.join(taskdir, 'figs'), exist_ok=True)
stream_files = glob.glob(os.path.join(taskdir, f"r*{task.tag}.stream"))
st = StreamInterface(input_files=stream_files, cell_only=False)
if st.rank == 0:
logger.debug(f'Read stream files: {stream_files}')
st.plot_cell_parameters(output=os.path.join(taskdir, f"{task.tag}_cell_distribution.png"))
st.plot_peakogram(output=os.path.join(taskdir, f"{task.tag}_peakogram.png"))
st.plot_cell_parameters(output=os.path.join(taskdir, f"figs/cell_{task.tag}.png"))
st.plot_peakogram(output=os.path.join(taskdir, f"figs/peakogram_{task.tag}.png"))
logger.info(f'Peakogram and cell distribution generated for sample {task.tag}')
logger.debug('Done!')
Loading