# Example processing: Intra-train heating
This uses a simplified version of the process_dssc_module function:

* frames are not grouped (e.g., in 'pumped', 'unpumped' or 'dark' frames)
* no selection of trains and/or pulses is done

The purpose of this analysis is to check how robust the diffraction signal is with respect to the repeated heat and radiation load from high repetition-rate pump-probe runs. Thus, we average over all trains within the run, but keep all individual pulses.

In [1]:
import karabo_data as kd
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from time import strftime
from matplotlib.colors import LogNorm, BoundaryNorm
import os
import dssc_process as dp

import dask.array as da
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

kd.__version__

'0.7.0'

In [2]:
%matplotlib widget

In [3]:
# make sure subfolders exist
for f in ['tmp', 'images', 'processed_runs']:
    if not os.path.isdir(f):
        os.mkdir(f)

## setup run information and index non-DSSC data

In [4]:
%%time

# basic run information
proposal = 2212
run_nr = 89
is_dark = False

#The files used for comparing GPFS and dCache:
#dirpath = '/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0026' #GPFS
#dirpath = '/gpfs/exfel/exp/SCS/201901/p002212/raw/r0125' #dCache


# index non-DSSC data
run = kd.open_run(proposal, run_nr, include='*DA*')
run.info()

# of trains:    1691
Duration:       0:02:49
First train ID: 517552829
Last train ID:  517554519

0 detector modules ()

3 instrument sources (excluding detectors):
  - SA3_XTD10_XGM/XGM/DOOCS:output
  - SCS_BLU_XGM/XGM/DOOCS:output
  - SCS_UTC1_ADQ/ADC/1:network

20 control sources:
  - P_GATT
  - SA3_XTD10_MONO/ENC/GRATING_AX
  - SA3_XTD10_MONO/MDL/PHOTON_ENERGY
  - SA3_XTD10_MONO/MOTOR/GRATINGS_X
  - SA3_XTD10_MONO/MOTOR/GRATING_AX
  - SA3_XTD10_MONO/MOTOR/HE_PM_X
  - SA3_XTD10_MONO/MOTOR/LE_PM_X
  - SA3_XTD10_VAC/DCTRL/AR_MODE_OK
  - SA3_XTD10_VAC/DCTRL/D12_APERT_IN_OK
  - SA3_XTD10_VAC/DCTRL/D6_APERT_IN_OK
  - SA3_XTD10_VAC/DCTRL/N2_MODE_OK
  - SA3_XTD10_VAC/GAUGE/G30470D_IN
  - SA3_XTD10_VAC/GAUGE/G30480D_IN
  - SA3_XTD10_VAC/GAUGE/G30490D_IN
  - SA3_XTD10_VAC/GAUGE/G30510C
  - SA3_XTD10_XGM/XGM/DOOCS
  - SCS_BLU_XGM/XGM/DOOCS
  - SCS_RR_UTC/MDL/BUNCH_DECODER
  - SCS_RR_UTC/TSYS/TIMESERVER
  - SCS_UTC1_ADQ/ADC/1

CPU times: user 25.2 ms, sys: 11.7 ms, total: 36.9 ms
Wall time: 15

## load XGM (but no filtering/ thresholding)

In [5]:
if not is_dark:
    xgm = dp.load_xgm(run, print_info=True)

SASE3 bunches per train: 75


## plot XGM

In [6]:
if not is_dark:
    fig, ax1 = plt.subplots(nrows=1, sharex=True)

    # ax1.plot(scan.xgm.mean('dim_0'), label='pumped')
    ax1.plot(xgm.trainId, xgm, 'o', c='C0', ms=1)
    ax1.set_ylabel('xgm')
    ax1.set_xlabel('trainId')

    ax1.set_title(f'run: {run_nr}')

    tstamp = strftime('%y%m%d_%H%M')
    fig.savefig(f'images/run{run_nr}_xgm_{tstamp}.png', dpi=200)

FigureCanvasNbAgg()

## define a function that calculates the average for one module

In [7]:
def process_intra_train_dask(proposal, run_nr, module, fpt):
    
    sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
    collection = kd.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*')

    arr = collection.get_dask_array(sourcename, 'image.data')
    arr_trains = arr.reshape(-1, fpt, 128, 512)
    
    return arr_trains.mean(axis=0, dtype=np.float32)

## start the cluster

In [8]:
cluster = SLURMCluster(
    queue='exfel',
    processes=8,
    cores=8, memory='256GB'
)

Click the link below to check out the cluster state.

In [9]:
cluster

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [10]:
cluster.scale(128)

## set up dask to use the cluster

In [11]:
client = Client(cluster)
print("Created dask client:", client)

Created dask client: <Client: 'tcp://131.169.182.82:41407' processes=128 threads=128, memory=4.10 TB>


## set up the computation

In [12]:
%%time

fpt = dp.load_dssc_info(proposal, run_nr)['frames_per_train']
mod_data = [
    process_intra_train_dask(proposal, run_nr, m, fpt)
    for m in range(16)
]

mod_data

CPU times: user 3.18 s, sys: 1.27 s, total: 4.45 s
Wall time: 42.4 s


[dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(75, 128, 512), dtype=float32, chunksize=(75, 128, 512), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, 

## stack the results

In [13]:
all_average = da.stack(mod_data)
all_average

Unnamed: 0,Array,Chunk
Bytes,314.57 MB,19.66 MB
Shape,"(16, 75, 128, 512)","(1, 75, 128, 512)"
Count,1024 Tasks,16 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 314.57 MB 19.66 MB Shape (16, 75, 128, 512) (1, 75, 128, 512) Count 1024 Tasks 16 Chunks Type float32 numpy.ndarray",16  1  512  128  75,

Unnamed: 0,Array,Chunk
Bytes,314.57 MB,19.66 MB
Shape,"(16, 75, 128, 512)","(1, 75, 128, 512)"
Count,1024 Tasks,16 Chunks
Type,float32,numpy.ndarray


## start the computation

In [14]:
%%time
module_data = all_average.compute()

CPU times: user 17.2 s, sys: 2.83 s, total: 20 s
Wall time: 2min 16s


## profiling


At first I tried to use the performance_report function, but it seems to not exist.
So instead I just gathered the data via the profile function.

In [15]:
#apparently the performance_report attribute does not exist

from dask.distributed import performance_report

with performance_report(filename="dask-report.html"):
    module_data = all_average.compute()

ImportError: cannot import name 'performance_report'

In [16]:
client.profile(filename="profile.html")

({'description': {'filename': '', 'name': '', 'line_number': 0, 'line': ''},
  'children': {'__call__;/software/anaconda3/5.2/lib/python3.6/site-packages/dask/optimization.py;1056': {'description': {'filename': '/software/anaconda3/5.2/lib/python3.6/site-packages/dask/optimization.py',
     'name': '__call__',
     'line_number': 1059,
     'line': 'return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))\n'},
    'children': {'get;/software/anaconda3/5.2/lib/python3.6/site-packages/dask/core.py;128': {'description': {'filename': '/software/anaconda3/5.2/lib/python3.6/site-packages/dask/core.py',
       'name': 'get',
       'line_number': 149,
       'line': 'result = _execute_task(task, cache)\n'},
      'children': {'_execute_task;/software/anaconda3/5.2/lib/python3.6/site-packages/dask/core.py;86': {'description': {'filename': '/software/anaconda3/5.2/lib/python3.6/site-packages/dask/core.py',
         'name': '_execute_task',
         'line_number': 119,
         'line

## further processing of the xgm array
(previously: merge processed data with scan variable and normalization data)

In [17]:
if not is_dark:
    xgm['pulse'] = np.arange(fpt, dtype=int)
    xgm = xgm.mean('trainId')
    xgm.name = 'xgm'

## turning the resulting np array into an xarray

For consistency's sake

In [18]:
module_data = xr.DataArray(
    module_data,
    dims=('module', 'pulse', 'x', 'y'),
    coords={
        'module': np.arange(16),
        'pulse': np.arange(fpt),
            },
    name='module_data'

)

## save to hdf5

In [19]:
import h5py

overwrite = True

save_folder = './processed_runs/'
prefix = ''

if is_dark:
    fname = f'{prefix}run{run_nr}.h5'  # no scan
else:
    fname = f'{prefix}run{run_nr}_by-pulse.h5'  # run with delay scan (change for other scan types!)


save_path = os.path.join(save_folder, fname)
file_exists = os.path.isfile(save_path)

if (not file_exists) or (file_exists and overwrite):
    if file_exists:
        os.remove(save_path)
    h5f = h5py.File(save_path, 'w')
    h5f.create_dataset('module_data', data=module_data)
    h5f.create_dataset('xgm', data=xgm)
    h5f.close()
    print('saving: ', save_path)
else:
    print('file', save_path, 'exists and overwrite is False')

saving:  ./processed_runs/run89_by-pulse.h5


## shutting down the cluster

In [20]:
client.close()
cluster.close()