# imports

In [27]:
import numpy as np
import matplotlib.pyplot as plt
import xarray as xr
from dask.distributed import Client, progress
from dask_jobqueue import SLURMCluster
from datetime import datetime
from tqdm.notebook import tqdm
import matplotlib as mpl
plt.style.use('./zoey_style.mplstyle')
from os.path import exists

In [29]:
datetime.now()

datetime.datetime(2024, 8, 21, 0, 24, 49, 109264)

# function definitions

In [2]:
def process_train(truncated_train, small_mask, run_index, module_index, group_index, phi_index, train_index, field, pulse_tol=0.05):
    """Reduce a single train into an xarray dataset that contains photon statistics information.
    
    Arguments:
    truncated train -- numpy array of shape (N pulses, M rows, K columns). Ideally the truncated train is a bounding box around the region
                        of interest to minimize the amount of data transferred.
    small_mask -- numpy array of shape (M rows, K columns) of type bool. Must match shape of truncated train above. Reduces the number of pixels
                    in the calculation.
    run_index -- integer or float in {70,  71,  73,  74,  76,  77,  79,  80,  82}. The run number being processed. Used for locating/saving files.
    module_index -- integer or float in {15, 0, 8, 7}. The detector module being processed. Used for locating/saving files.
    group_index -- integer or float between 0-61. The group number being processed. Used for locating/saving files.
    phi_index -- integer or float between 1-3. The angular region of interest (ROI) index being processed. Used for locating/saving files.
    train_index -- integer. Artifial index used to merge multiple trains together before saving data. Metadata.
    field -- integer or float. Current of magnetic field used during experiment. Metadata.
    pulse_tol -- float. The intensity tolerance allowed between two different pulses to be considered valid.
    
    Returns:
    xarray dataset with the following
    keys:
        probability_k_photons - numpy array of shape (N pulse separations, K photon events, M valid pairs) = (200, 50, 200). Probability of detecting 
                                K photons at a separation of N pulses. Maximum number of possible pairs is no filtering (M=200) but likely much less.
        average_number_photons - numpy array of shape (N pulse separations, M valid pairs) = (200, 200). Average intensity of integrated region of 
                                interest (ROI) corresponding to probability_k_photons. Maximum number of possible pairs is no filtering (M=200) but 
                                likely much less.
        number_valid_pairs - numpy array of shape (N pulse separations) = (200). The number of pulse pairs that pass the filtering at a delay time 
                            of N pulses.
        integrated_intensity - numpy array of shape (N pulses) = (200). Integrated intensity of region of interest (ROI) for each pulse.
        train_index - integer. Artifial index used to merge multiple trains together before saving data. Metadata.
        group_index -- integer or float between 0-61. The group number being processed. Used for locating/saving files.
        phi_index -- integer or float between 1-3. The angular region of interest (ROI) index being processed. Used for locating/saving files.
    
    attrs:
        run_number -- integer or float in {70,  71,  73,  74,  76,  77,  79,  80,  82}. The run number being processed. Used for locating/saving files.
        magnet_current -- integer or float. Current of magnetic field used during experiment. Metadata.
        module_number -- integer or float in {15, 0, 8, 7}. The detector module being processed. Used for locating/saving files.
        intensity_pulse_tolerance -- float. The intensity tolerance allowed between two different pulses to be considered valid.
        date_processed -- datetime. Metadata of when this was processed to help with version control.
    """
    
    # turn train into masked array to remove unwanted pixels
    train = np.ma.masked_array(data=np.maximum(0, truncated_train), mask=np.broadcast_to(small_mask, truncated_train.shape))
    # ravel pixels and remove masked values. train shape is (200 pulses x N pixels)
    train = train.compressed().reshape(train.shape[0], -1)

    # integrated intensity of image within train intensities shape is (200 pulses)
    intensities = train.sum(axis=1)

    # iterate over all delay times between two pulses (dt = 0 ... dt = 199)
    # check how many pulses pass the intensity filter
    valid_pulses = np.zeros(len(train))
    # prob_k_all shape is (delay time [200 max] x K [50 max] photons arriving x N [200 max] valid pulses)
    prob_k_all = np.zeros((200, 50, 200)) * np.nan
    # avg_k_all shape is (delay time [200 max] x N [200 max] valid pulses)
    avg_k_all = np.zeros((200, 200)) * np.nan
    for i, dt in tqdm(enumerate(range(len(train))), total=len(train)):
        left_stack = train[:len(train)-dt]
        right_stack = train[dt:]
        left_sum = np.sum(left_stack, axis=-1)
        right_sum = np.sum(right_stack, axis=-1)
        filt = np.logical_and(left_sum<=(1+pulse_tol)*right_sum, 
                              right_sum<=(1+pulse_tol)*left_sum)

        valid_pulses[i] = len(np.where(filt)[0])
        # integrate two single pulses to calculate contrast. two_images shape is (N valid pulses x M pixels)
        two_images = left_stack[filt] + right_stack[filt]                      
        # if there are pixels above 49 photons then set it to 49. This value won't be used in calculating the
        # contrast and ensures a consistent size when using np.bincount below
        two_images[two_images>49] = 49

        # enumerate over all valid pulses to extract the probability of photon arrival events.
        prob_k = np.zeros((50, 200)) * np.nan
        avg_k = np.zeros((50, 200)) * np.nan
        for ii, p in enumerate(two_images.astype(int)):
            counts = np.bincount(p)
            prob_k[:len(counts), ii] = counts / (len(p))

        avg_k = np.mean(two_images, axis=(-1))

        prob_k_all[i] = prob_k
        avg_k_all[i, :len(avg_k)] = avg_k

    # store all relevant data for a single train in a consistent data structure
    ds = xr.Dataset(
        data_vars={'probability_k_photons': (('pulse_separation', 'k_photons', 'unique_pairs'), prob_k_all),
                  'average_number_photons': (('pulse_separation', 'unique_pairs'), avg_k_all),
                  'number_valid_pairs': (('index'), valid_pulses),
                   'integrated_intensity': (('pulse_index'), intensities),
                   'train_index': train_index,
                   'group_index': group_index,
                   'phi_index': phi_index
                  },
        attrs={'run_number': run_index,
               'magnet_current': field,
               'module_number': module_index,
               'intensity_pulse_tolerance': pulse_tol,
               'date_processed': datetime.now()
              }
    )    
    return ds

## metadata

In [3]:
current =    [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0]
runNB =      [70,  71,  73,  74,  76,  77,  79,  80,  82]
modules = [15, 0, 8, 7]
proposalNB = 2884

field = current[0]
run = runNB[0]
module = modules[1]
phi = 2

## initialize cluster

In [4]:
partition = 'upex'   # For users

cluster = SLURMCluster(
    queue=partition,
    local_directory='/scratch',  # Local disk space for workers to use
    log_directory='./logs',
    walltime=600,

    # Resources per SLURM job (per node, the way SLURM is configured on Maxwell)
    # processes=16 runs 16 Dask workers in a job, so each worker has 1 core & 32 GB RAM.
    processes=16, cores=16, memory='512GB',
)

# Get a notbook widget showing the cluster state
cluster

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [5]:
cluster.scale(64)

In [6]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://131.169.183.131:46839,Cluster  Workers: 64  Cores: 64  Memory: 2.05 TB


In [7]:
def load_photon_maps(run, module, group, phi, roi):
    # load photon maps around roi.
    photon_maps_filename = f'/gpfs/exfel/u/scratch/SCS/202201/p002884/LS/Reduced_Data/reduced_3-17-23/r{run:0d}m{module:0d}_{group:1d}_photon.npy'
    photon_maps = np.load(photon_maps_filename, 'r')[(..., *roi)]
    return photon_maps
    
def load_mask(run, module, phi):
    mask = np.load(f'./masks/run{run:0d}_module{module:1d}_mask.npy')
    mask[mask!=phi] = 0
    mask[mask==phi] = 1
    mask = mask.astype(bool)
    return mask
    
def get_roi(mask):
    locs = np.argwhere(mask==1).T
    roi = np.s_[min(locs[0]):max(locs[0]), min(locs[1]):max(locs[1])]
    return roi

In [8]:
mask = load_mask(run, module, phi)
roi = get_roi(mask)
mask = mask[roi]

for group in tqdm(range(61), total=61, desc='processing group'):
    output_file_stats = rf'./calculated_statistics/run{run:02d}_module{module:02d}_group{group:02d}_phi{phi:01d}.h5'
    if exists(output_file_stats):
        continue
    photon_maps = load_photon_maps(run, module, group, phi, roi)
    
    futures = []
    for train, train_id in tqdm(zip(photon_maps, np.arange(len(photon_maps))), total=len(photon_maps), desc='distributing trains'):
        future = client.submit(process_train, truncated_train=train, small_mask=mask, run_index=run, module_index=module,
                               group_index=group, phi_index=phi, train_index=train_id, field=field, pulse_tol=0.05)
        futures.append(future)
    out = client.gather(futures)
    ds = xr.concat(out, dim='train_index')
    ds.to_netcdf(output_file_stats, engine='h5netcdf')
    

processing group:   0%|          | 0/61 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/300 [00:00<?, ?it/s]

distributing trains:   0%|          | 0/89 [00:00<?, ?it/s]

In [9]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
