## [Demo] Working with PSANA inside Jupyter lab notebook

This notebook starts a Dask scheduler that serves as a bridge to share data between PSANA and Jupyter Lab. Using the `slurm` package, users can submit a PSANA job and return results to be manipulated within the notebook. 

This notebook is able to run the example shown in https://confluence.slac.stanford.edu/display/PSDMInternal/Small+H5

Note: `slurm_magic` version from https://github.com/muammar/slurm-magic. 

We write the python file used to run the job.

In [1]:
%%writefile example.py
from psana import DataSource
import numpy as np
import sys
from dask.distributed import Client, Queue

# The code below fail with MPI rank errors. 
# import os
# os.environ['PS_SRV_NODES'] = "2"

# Connect to the Dask scheduler
client = Client(scheduler_file="scheduler.json")

# Create a Queue object
queue = Queue("psana")

# called back on each SRV node, for every smd.event() call below
def test_callback(data_dict):
    print(data_dict)

ds = DataSource(exp='xpptut13', run=1, dir='.tmp')

# batch_size here specifies how often the dictionary of information
# is sent to the SRV nodes
smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback])
run = next(ds.runs())

# necessary (instead of "None") since some ranks may not receive events
# and the smd.sum() below could fail

arrsum = np.zeros((2), dtype=np.int)

for i,evt in enumerate(run.events()):
    myones = np.ones_like(arrsum)
    smd.event(evt, myfloat=2.0, arrint=myones)
    arrsum += myones
    queue.put(arrsum.tolist())


# This fails as reported in https://github.com/slac-lcls/lcls2/issues/11

# if smd.summary:
#     smd.sum(arrsum)
#     smd.save_summary({'summary_array' : arrsum}, summary_int=1)
# smd.done()

sys.exit()

Writing example.py


In [2]:
%%writefile example.sub
#!/bin/bash
#SBATCH -N 1
#SBATCH -n 3
#SBATCH -c 1
#SBATCH -C haswell
#SBATCH -q debug
#SBATCH -J smallD
#SBATCH --account=m3384
#SBATCH --mail-user=melkhatibr@lbl.gov
#SBATCH --mail-type=ALL
#SBATCH -t 00:30:00

export PS_SRV_NODES=2

srun -n 6 -o example.log python example.py

Writing example.sub


In [None]:
print("remove # to clean")
!rm -rf *.json *.log .tmp *.out dask-worker-space/

Load the `slurm_magic` package

In [None]:
%load_ext slurm_magic
%squeue -u melkhati

We proceed to start a scheduler in the login node to be our bridge to share data. 

In [None]:
from dask.distributed import Client, LocalCluster, Queue

In [None]:
scheduler_file = 'scheduler.json'
cluster = LocalCluster(n_workers=1, threads_per_worker=1, host="0.0.0.0")
client = Client(cluster)
client.write_scheduler_file(scheduler_file)

We can now verify our scheduler is up and running. We also created a `scheduler.json` file that can be used to connect as many notebooks as desired.

In [None]:
client

In [None]:
import os 

if os.path.isfile(scheduler_file):
    print("Scheduler file with name {} exists!".format(scheduler_file))

Now that client is up and running, there are potentially two ways to submit and gather data from a PSANA job:

1. Using a `Queue` in Dask. 
2. Using Pub/Sub scheme. 

## Queue example

In [None]:
!python /global/common/software/lcls/psana/lcls2/psana/psana/tests/setup_input_files.py

In [None]:
# %env PS_SRV_NODES=2
# %srun -n 3 -A m3384  -C haswell -J smallD -q debug  -o example.log python example.py

In [None]:
%sbatch example.sub

In [None]:
%squeue -u melkhati

In [None]:
q = Queue("psana")

In [None]:
q.qsize()

In [None]:
import numpy as np

results = []
for data in range(q.qsize()):
    results.append(np.array(q.get()))

In [None]:
results = np.array(results)
results