## [Demo] Working with PSANA inside Jupyter lab notebook

This notebook starts a Dask scheduler that serves as a bridge to share data between PSANA and Jupyter Lab. Using the `slurm_magic` package, users can submit a PSANA job and return results to be manipulated within the notebook. 

**Note**: `%srun` is a blocking magic command. 

In [1]:
print("remove # to clean")
!rm -rf *.json *.log .tmp

remove # to clean


Load the `slurm_magic` package

In [2]:
%load_ext slurm_magic

We proceed to start a scheduler in the login node to be our bridge to share data. 

In [3]:
%squeue -u melkhati

Unnamed: 0,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,Unnamed: 10,Unnamed: 11,Unnamed: 12,Unnamed: 13,JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)
27557189,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568198,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568212,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568214,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568220,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568227,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568228,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568229,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568231,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568232,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)


In [4]:
from dask.distributed import Client, LocalCluster, Queue

In [5]:
scheduler_file = 'scheduler.json'
cluster = LocalCluster(n_workers=1, threads_per_worker=1, host="0.0.0.0")
client = Client(cluster)
client.write_scheduler_file(scheduler_file)

We can now verify our scheduler is up and running. We also created a `scheduler.json` file that can be used to connect as many notebooks as desired.

In [6]:
client

0,1
Client  Scheduler: tcp://128.55.224.44:43763  Dashboard: http://128.55.224.44:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 540.15 GB


In [7]:
import os 

if os.path.isfile(scheduler_file):
    print("Scheduler file with name {} exists!".format(scheduler_file))

Scheduler file with name scheduler.json exists!


Now that client is up and running, there are potentially two ways to submit and gather data from a PSANA job:

1. Using a `Queue` in Dask. 
2. Using Pub/Sub scheme. 

## Queue example

In [8]:
!python /global/common/software/lcls/psana/lcls2/psana/psana/tests/setup_input_files.py


Start writing offsets.
Finished writing smd for 9 L1 events and 8 update events. Big data file has 19 events with size (B): 37120

Start writing offsets.
Finished writing smd for 9 L1 events and 8 update events. Big data file has 19 events with size (B): 32728


This is a non-blocking call of srun from the jupyter notebook.

In [9]:
import subprocess

In [10]:
def run_command(ntasks, account, constraint, queue, jobname, output, filename):
    command = "srun -n {} -A {}  -C {}  -q {} -J {} -o {} python {}".format(
        ntasks, account, constraint, queue, jobname, output, filename
    )
    subprocess.Popen(command.split())

In [12]:
run_command(3, "m3384", "haswell", "debug", "psana", "test.log", "test.py")

In [13]:
%squeue -u melkhati

Unnamed: 0,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,Unnamed: 10,Unnamed: 11,Unnamed: 12,Unnamed: 13,JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)
27557189,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568198,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568212,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568214,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568220,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568227,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568228,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568229,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)
27568231,regular_h,FW_job,melkhati,PD,0:00,1,(Priority),,,,,,,,,,,,,,
27568232,regular_k,FW_job,melkhati,PD,0:00,1,(Nodes,required,for,job,are,"DOWN,",DRAINED,or,reserved,for,jobs,in,higher,priority,partitions)


In [14]:
q = Queue("psana")

In [36]:
q.qsize()

10

In [37]:
import numpy as np

results = []
for data in range(q.qsize()):
    results.append(np.array(q.get()))

In [38]:
results = np.array(results)
results

array([[[[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]]],


       [[[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]]],


       [[[ 0,  1,  2,  3,  4,  5],
         [ 6,  7,  8,  9, 10, 11],
         [12, 13, 14, 15, 16, 17]],

        [[ 0,  1,  2,  3,  4,  5]