In [1]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress
partition = 'milano'  # For LCLS II staff

cluster = SLURMCluster(
    queue=partition,
    account="lcls:data",
    local_directory='/sdf/home/m/monarin/tmp/',  # Local disk space for workers to use

    # Resources per SLURM job (per node, the way SLURM is configured on Roma)
    # processes=16 runs 16 Dask workers in a job, so each worker has 1 core & 32 GB RAM.
    processes=4, cores=4, memory='512GB',
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34171 instead


In [2]:
cluster.scale(jobs=1)
cluster.job_script()

'#!/usr/bin/env bash\n\n#SBATCH -J dask-worker\n#SBATCH -p milano\n#SBATCH -A lcls:data\n#SBATCH -n 1\n#SBATCH --cpus-per-task=4\n#SBATCH --mem=477G\n#SBATCH -t 00:30:00\n\n/sdf/group/lcls/ds/ana/sw/conda2/inst/envs/ps-4.6.1/bin/python -m distributed.cli.dask_worker tcp://172.24.49.11:34239 --nthreads 1 --nworkers 4 --memory-limit 119.21GiB --name dummy-name --nanny --death-timeout 60 --local-directory /sdf/home/m/monarin/tmp/\n'

In [3]:
client = Client(cluster)

In [4]:
%%bash
hostname

sdfiana001


In [5]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.24.49.11:34171/status,

0,1
Dashboard: http://172.24.49.11:34171/status,Workers: 4
Total threads: 4,Total memory: 476.84 GiB

0,1
Comm: tcp://172.24.49.11:34239,Workers: 4
Dashboard: http://172.24.49.11:34171/status,Total threads: 4
Started: Just now,Total memory: 476.84 GiB

0,1
Comm: tcp://172.24.48.127:42293,Total threads: 1
Dashboard: http://172.24.48.127:32771/status,Memory: 119.21 GiB
Nanny: tcp://172.24.48.127:36565,
Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-o6rht42c,Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-o6rht42c

0,1
Comm: tcp://172.24.48.127:34393,Total threads: 1
Dashboard: http://172.24.48.127:35965/status,Memory: 119.21 GiB
Nanny: tcp://172.24.48.127:40521,
Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-sa3ql13x,Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-sa3ql13x

0,1
Comm: tcp://172.24.48.127:32881,Total threads: 1
Dashboard: http://172.24.48.127:34547/status,Memory: 119.21 GiB
Nanny: tcp://172.24.48.127:42657,
Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-z8f5tynu,Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-z8f5tynu

0,1
Comm: tcp://172.24.48.127:44269,Total threads: 1
Dashboard: http://172.24.48.127:33315/status,Memory: 119.21 GiB
Nanny: tcp://172.24.48.127:42391,
Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-skpnpk6_,Local directory: /sdf/home/m/monarin/tmp/dask-scratch-space/worker-skpnpk6_


In [6]:
import h5py
import dask
import dask.array as da
import numpy as np
f=h5py.File('/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mysmallh5.h5', 'r')

In [7]:
%%time
da_ts = da.from_array(f['timestamp'], chunks='auto')

CPU times: user 1.21 ms, sys: 1.91 ms, total: 3.12 ms
Wall time: 2.72 ms


In [8]:
%%time
inds = da_ts.argtopk(-da_ts.shape[0]).compute()

CPU times: user 227 ms, sys: 120 ms, total: 348 ms
Wall time: 12.4 s


In [11]:
da_arr = da.from_array(f['calib'], chunks=tuple([10000000]+list(f['calib'].shape[1:])))

In [12]:
da_arr

Unnamed: 0,Array,Chunk
Bytes,457.76 MiB,457.76 MiB
Shape,"(10000000, 6)","(10000000, 6)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 457.76 MiB 457.76 MiB Shape (10000000, 6) (10000000, 6) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",6  10000000,

Unnamed: 0,Array,Chunk
Bytes,457.76 MiB,457.76 MiB
Shape,"(10000000, 6)","(10000000, 6)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [9]:
import dask.dataframe as dd
import pandas as pd

In [10]:
d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}

In [11]:
df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)

In [12]:
df.to_hdf('/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output.hdf', '/data')  

['/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output.hdf',
 '/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output.hdf']

In [27]:
df_arr = dd.from_dask_array(da_arr)

In [28]:
df_arr

Unnamed: 0_level_0,0,1,2,3,4,5
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,float64,float64,float64,float64,float64,float64
9999999,...,...,...,...,...,...


In [31]:
from distributed.protocol import serialize

In [32]:
serialize(da_arr)

({'serializer': 'pickle', 'writeable': (True,)},
 [b'\x80\x05\x95_\x05\x00\x00\x00\x00\x00\x00\x8c\x0fdask.array.core\x94\x8c\x05Array\x94\x93\x94(\x8c\x13dask.highlevelgraph\x94\x8c\x0eHighLevelGraph\x94\x93\x94)\x81\x94}\x94(\x8c\x0cdependencies\x94}\x94(\x8c/original-array-34a219a9e3535b7b9ba13fad14698627\x94\x8f\x94\x8c&array-34a219a9e3535b7b9ba13fad14698627\x94\x8f\x94(h\n\x90u\x8c\x10key_dependencies\x94}\x94\x8c\x06layers\x94}\x94(h\nh\x03\x8c\x11MaterializedLayer\x94\x93\x94)\x81\x94}\x94(\x8c\x0bannotations\x94N\x8c\x16collection_annotations\x94N\x8c\x07mapping\x94}\x94h\n\x8c\x19distributed.protocol.h5py\x94\x8c\x18deserialize_h5py_dataset\x94\x93\x94}\x94(\x8c\x08filename\x94\x8c8/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mysmallh5.h5\x94\x8c\x04name\x94\x8c\x06/calib\x94u]\x94\x86\x94R\x94subh\x0c\x8c\x0edask.blockwise\x94\x8c\tBlockwise\x94\x93\x94)\x81\x94}\x94(h\x16Nh\x17}\x94(\x8c\x05shape\x94J\x80\x96\x98\x00K\x06\x86\x94\x8c\x05dtype\x94\x8c\x05numpy\x94h-\x93\x94\x8

In [33]:
serialize(df_arr)

({'serializer': 'pickle', 'writeable': (True, True)},
 [b'\x80\x05\x95h\n\x00\x00\x00\x00\x00\x00\x8c\x13dask.dataframe.core\x94\x8c\tDataFrame\x94\x93\x94)\x81\x94(\x8c\x13dask.highlevelgraph\x94\x8c\x0eHighLevelGraph\x94\x93\x94)\x81\x94}\x94(\x8c\x0cdependencies\x94}\x94(\x8c/original-array-34a219a9e3535b7b9ba13fad14698627\x94\x8f\x94\x8c&array-34a219a9e3535b7b9ba13fad14698627\x94\x8f\x94(h\x0b\x90\x8c0from-dask-array-ffa8d98e84138813fe45d79f65d2bf13\x94\x8f\x94(h\r\x90u\x8c\x10key_dependencies\x94}\x94\x8c\x06layers\x94}\x94(h\x0bh\x04\x8c\x11MaterializedLayer\x94\x93\x94)\x81\x94}\x94(\x8c\x0bannotations\x94N\x8c\x16collection_annotations\x94N\x8c\x07mapping\x94}\x94h\x0b\x8c\x19distributed.protocol.h5py\x94\x8c\x18deserialize_h5py_dataset\x94\x93\x94}\x94(\x8c\x08filename\x94\x8c8/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mysmallh5.h5\x94\x8c\x04name\x94\x8c\x06/calib\x94u]\x94\x86\x94R\x94subh\r\x8c\x0edask.blockwise\x94\x8c\tBlockwise\x94\x93\x94)\x81\x94}\x94(h\x19Nh\x1a}\x94

In [30]:
df_arr.to_hdf('/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output-*.hdf', '/data', scheduler='processes') 



type: h5py objects cannot be pickled

In [13]:
arr = da_arr[inds].compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [14]:
arr

array([[0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       ...,
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.]])

In [16]:
arr.shape

(10000000, 6)

In [21]:
data = {'calib': arr}

In [23]:
da.to_hdf5('/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/tmp_out.h5', data, chunks=False)

type: All sources must be dask array objects