**Packages To Load At the Beginning**

In [2]:
# lines starting with '%' are notebook magic functions
# %matplotlib notebook
%pylab
%matplotlib inline
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format='retina'

# standard python packages
import numpy as np
import modin.pandas as pd
import scipy.integrate as integrate
from scipy.stats import gamma as gammafunc
from scipy.special import gamma
from scipy.stats import norm
from time import time
import pickle

# main analysis software can be installed by: pip install Xana
from Xana import Xana
from Xana.Xplot.niceplot import niceplot

# reading AGIPD data provided by XFEL
from extra_data import RunDirectory, stack_detector_data, open_run
from extra_geom import AGIPD_1MGeometry

# for plotting
import matplotlib as mpl
from matplotlib import pyplot as plt
from matplotlib import patches
from matplotlib.colors import LogNorm
from matplotlib.collections import PatchCollection
from mpl_toolkits.axes_grid1 import make_axes_locatable
import seaborn as sns

# DASK
import dask.array as da
from dask.distributed import Client, progress
from dask_jobqueue import SLURMCluster

Using matplotlib backend: agg
Populating the interactive namespace from numpy and matplotlib
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
def module2asics(data, reverse=False):
    nrows = ncols = 64
    if data.ndim == 3:
        if not reverse:
            return(data.reshape(-1, 8, nrows, 2, ncols).swapaxes(2,3).reshape(-1, 16, nrows, ncols))
        else: 
            return(data.reshape(-1, 8, 2, nrows, ncols).swapaxes(2,3).reshape(-1,512,128))
    elif data.ndim == 2:
        if not reverse:
            return(data.reshape(8, nrows, 2, ncols).swapaxes(1,2).reshape(16, nrows, ncols))
        else: 
            return(data.reshape(8, 2, nrows, ncols).swapaxes(1,2).reshape(512,128))

def commonmode_module(module):
    asics = module2asics(module)
    for asic in asics:
        asic -= np.nanmedian(asic)
    return module2asics(asics, reverse=True)

**some helper functions not implemented in Xana, yet**

In [4]:
tmp = pickle.load(open("./rois_01.pkl", 'rb'))
ROIS_2D = tmp['rois_2d']
ROIS_3D = tmp['rois_3d']
del tmp

def get_module_pixel(ROIS_3D):
    rois = []
    modules = []
    nrois = len(ROIS_3D)
    for i in range(16):
        rois.append([])
        for j in range(nrois):
            ind = np.where(ROIS_3D[j][0]==i)[0]
            if ind.size > 0:
                rois[i].append([ROIS_3D[j][k][ind] for k in range(1,3)])
                modules.append(i)
       
    modules = np.unique(modules)
    return modules, rois

modules, module_pixels = get_module_pixel(ROIS_3D)

## Select a Run

Load processed or raw data

In [5]:
mask = ~np.load("/gpfs/exfel/exp/MID/202001/p002458/scratch/masks/mask_cryo.npy")

In [6]:
run = RunDirectory('/gpfs/exfel/exp/MID/202001/p002458/scratch/example_data/r0522/')
run.info()

# of trains:    1435
Duration:       0:02:23.5
First train ID: 541834019
Last train ID:  541835453

16 detector modules (MID_DET_AGIPD1M-1)
  e.g. module MID_DET_AGIPD1M-1 0 : 512 x 128 pixels
  MID_DET_AGIPD1M-1/DET/0CH0:xtdf
  100 frames per train, up to 143500 frames total

1 instrument sources (excluding detectors):
  - SA2_XTD1_XGM/XGM/DOOCS:output

13 control sources:
  - MID_DET_AGIPD1M/CC/MON_0
  - MID_EXP_AGIPD1M/GAUGE/PG1
  - MID_EXP_AGIPD1M/PSC/HV
  - MID_EXP_AGIPD1M/TSENS/H1_T_EXTHOUS
  - MID_EXP_AGIPD1M/TSENS/H2_T_EXTHOUS
  - MID_EXP_AGIPD1M/TSENS/Q1_T_BLOCK
  - MID_EXP_AGIPD1M/TSENS/Q2_T_BLOCK
  - MID_EXP_AGIPD1M/TSENS/Q3_T_BLOCK
  - MID_EXP_AGIPD1M/TSENS/Q4_T_BLOCK
  - MID_EXP_AGIPD1M1/CTRL/MC1
  - MID_EXP_AGIPD1M1/CTRL/MC2
  - MID_EXP_SYS/TSYS/UTC-2-S4
  - SA2_XTD1_XGM/XGM/DOOCS



# Setup Dask Cluster

In [23]:
partition = 'exfel'  # For EuXFEL staff

cluster = SLURMCluster(
    queue=partition,
    # Resources per SLURM job (per node, the way SLURM is configured on Maxwell)
    # processes=16 runs 16 Dask workers in a job, so each worker has 1 core & 32 GB RAM.
    processes=16, cores=16, memory='512GB',
    log_directory='./dask_tmp/',
    local_directory='./dask_tmp/',
)

# Get a notbook widget showing the cluster state
cluster

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [25]:
SLURMCluster?

In [225]:
# Submit 2 SLURM jobs, for 32 Dask workers
cluster.scale(32)

In [9]:
client = Client(cluster)
print("Created dask client:", client)

Created dask client: <Client: 'tcp://131.169.182.129:33501' processes=32 threads=32, memory=1.02 TB>


In [None]:
speckle_contrast = var(pix)/mean(pix)**2
average_intensity
variance
speckle size 

In [36]:
module

Unnamed: 0,Array,Chunk
Bytes,37.57 GB,2.04 GB
Shape,"(1433, 100, 512, 128)","(78, 100, 512, 128)"
Count,108 Tasks,22 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 37.57 GB 2.04 GB Shape (1433, 100, 512, 128) (78, 100, 512, 128) Count 108 Tasks 22 Chunks Type float32 numpy.ndarray",1433  1  128  512  100,

Unnamed: 0,Array,Chunk
Bytes,37.57 GB,2.04 GB
Shape,"(1433, 100, 512, 128)","(78, 100, 512, 128)"
Count,108 Tasks,22 Chunks
Type,float32,numpy.ndarray


In [38]:
source = f'MID_DET_AGIPD1M-1/DET/{0}CH0:xtdf'
pulses_per_train = run.get_data_counts(source, 'image.data').iloc[0]

module = run.get_dask_array(source, 'image.data')#[:, :1]
asics = module2asics(module)

# # Make a new dimension for trains
module = module.reshape(-1, pulses_per_train, 512, 128)
asics = asics.reshape(-1, pulses_per_train, 16, 64, 64)
asics.mean(axis=(-2,-1))

Unnamed: 0,Array,Chunk
Bytes,9.17 MB,499.20 kB
Shape,"(1433, 100, 16)","(78, 100, 16)"
Count,218 Tasks,22 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.17 MB 499.20 kB Shape (1433, 100, 16) (78, 100, 16) Count 218 Tasks 22 Chunks Type float32 numpy.ndarray",16  100  1433,

Unnamed: 0,Array,Chunk
Bytes,9.17 MB,499.20 kB
Shape,"(1433, 100, 16)","(78, 100, 16)"
Count,218 Tasks,22 Chunks
Type,float32,numpy.ndarray


In [20]:
run.trains?

In [8]:
from extra_data import by_index, by_id

In [14]:
ids = np.random.randint(0,len(run.train_ids), 10)

In [22]:
for train_id, data in run.trains(devices="*/DET/*", train_range=by_index[ids], require_all=True):
    print(train_id, data.keys())

541834040 dict_keys(['MID_DET_AGIPD1M-1/DET/10CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/6CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/4CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/1CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/9CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/0CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/13CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/5CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/15CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/3CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/8CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/11CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/12CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/2CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/7CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/14CH0:xtdf'])
541834042 dict_keys(['MID_DET_AGIPD1M-1/DET/10CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/6CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/4CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/1CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/9CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/0CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/13CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/5CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/15CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/3CH0:xtdf', 'MID_DET_AGIPD1M-1/DET/8CH0:xtdf', 'MI

KeyboardInterrupt: 

In [116]:
corrupted_trains = []
for i in range(16):
    source = f'MID_DET_AGIPD1M-1/DET/{i}CH0:xtdf'
    datcounts = run.get_data_counts(source, 'image.data')
    corrupted_trains.extend(datcounts[datcounts==0].index.values)
    
corrupted_trains = np.unique(corrupted_trains)
print(f"found {len(corrupted_trains)} corrupted trains")
print(corrupted_trains)

found 15 corrupted trains
[541834032 541834092 541834212 541834392 541834512 541834532 541834632
 541834732 541834852 541834872 541834992 541835112 541835212 541835252
 541835352]


In [178]:
mod_train_ids = run.get_dataframe(fields=[('*/DET/*', 'trailer.trainId')])

corrupted_trains = mod_train_ids[mod_train_ids.isna().sum(1)>0].index.values
print(f"found {len(corrupted_trains)} corrupted trains")
print(corrupted_trains)

mod_train_ids.reset_index(level=0, inplace=True)
mod_train_ids.rename(columns={"index": "train_id"}, inplace=True)
# mod_train_ids.dropna(axis=0, inplace=True)

found 15 corrupted trains
[541834032 541834092 541834212 541834392 541834512 541834532 541834632
 541834732 541834852 541834872 541834992 541835112 541835212 541835252
 541835352]


In [204]:
sum(df.isin(corrupted_trains))

13

In [207]:
full_trains = []
for i in range(16):
    col = f'MID_DET_AGIPD1M-1/DET/{i}CH0:xtdf/trailer.trainId'
    df = mod_train_ids[col].dropna(axis=0).reset_index(drop=True)
    full_trains.append(np.where(~df.isin(corrupted_trains))[0])
#     full_trains.append(np.where())

In [227]:
import dask.dataframe as dd

In [228]:
np.repeat(np.arange(4), 4)

array([0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3])

In [226]:
def average_module(modno, run, pulses_per_train=None, full_trains=None):
    source = f'MID_DET_AGIPD1M-1/DET/{modno}CH0:xtdf'
    if pulses_per_train is None:
        counts = run.get_data_counts(source, 'image.data')
        pulses_per_train = counts.max()
        bad_trains = len(counts[counts==0])
        print(f"Found {pulses_per_train} pulses per train, bad trains: {bad_trains}")
            
    module = run.get_dask_array(source, 'image.data')
    if full_trains is not None:
        train_indices = full_trains[modno]
        module = module.reshape(-1, pulses_per_train, 512, 128)
        module = module[train_indices]
        module = module.reshape(-1, 512, 128)

    asics = module2asics(module)
    
    # # Make a new dimension for trains
    module = module.reshape(-1, pulses_per_train, 512, 128)
    asics = asics.reshape(-1, pulses_per_train, 16, 64 * 64)
    
    average = asics.mean(axis=-1)
    variance = asics.var(axis=-1)
    
    modn = np
    
    return 

# with Client(cluster):
#     all_average = da.stack([
#         average_module(i, run, full_trains=full_trains)
#         for i in range(16)
#     ])
#     all_average.compute()
    
with Client(cluster):
    all_average = da.stack([
        average_module(i, run, full_trains=full_trains)
        for i in range(16)
    ])
    all_average.compute()

Found 100 pulses per train, bad trains: 2
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 2
Found 100 pulses per train, bad trains: 2
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 3
Found 100 pulses per train, bad trains: 1
Found 100 pulses per train, bad trains: 2
Found 100 pulses per train, bad trains: 1
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 2
Found 100 pulses per train, bad trains: 0
Found 100 pulses per train, bad trains: 4


KeyboardInterrupt: 

In [45]:
len(run.train_ids)

1435

In [116]:
corrupted_trains = []
for i in range(16):
    source = f'MID_DET_AGIPD1M-1/DET/{i}CH0:xtdf'
    datcounts = run.get_data_counts(source, 'image.data')
    corrupted_trains.extend(datcounts[datcounts==0].index.values)
    
corrupted_trains = np.unique(corrupted_trains)
print(f"found {len(corrupted_trains)} corrupted trains")
print(corrupted_trains)

found 15 corrupted trains
[541834032 541834092 541834212 541834392 541834512 541834532 541834632
 541834732 541834852 541834872 541834992 541835112 541835212 541835252
 541835352]


In [75]:
client.close()