In [1]:
on_pm = False
on_cori_gpu = True
async_on = True
large_data = True
io_workers = False

# Import/Configuration

## Set up rmm and cupy
- Want to use rmm for memory management (this may be changed later to enable NVLink).
- This should be performed before client setup (I think? - TODO: test RMM).

## Start up dask distributed client 
- Could call the startup script from this jupyter notebook, eventually.
- The startup script is in ~/utility/start_dask.sh

In [19]:
import dask
import cupy as cp
import rmm
from dask.distributed import Client
import os

if on_pm or on_cori_gpu:
    scheduler_file = os.path.join(os.environ["SCRATCH"], "scheduler_file.json")
    dask.config.config["distributed"]["dashboard"]["link"] = "{JUPYTERHUB_SERVICE_PREFIX}proxy/{host}:{port}/status" 
else:
    scheduler_file = os.path.join("/documents/stempy-jupyter-notebooks/", "scheduler_file.json")

if async_on:
    client = await Client(scheduler_file=scheduler_file, asynchronous=True)
else:
    client = Client(scheduler_file=scheduler_file)
    
if on_pm:
    await client.run(cp.cuda.set_allocator,
           rmm.rmm_cupy_allocator
          )
workers_info = client.scheduler_info()["workers"]
worker_ids = []
for key in workers_info:
    worker_info = workers_info[key]
    worker_ids.append(worker_info["id"]) # Gets the worker ID
if on_pm:
    gpu_workers = [x for x in worker_ids if "GPU" in x]
    head_gpu_worker = gpu_workers[0]
elif on_cori_gpu:
    gpu_workers = [x for x in worker_ids]
    head_gpu_worker = gpu_workers[0]
print(f"Total workers: {len(worker_ids)}")
print(f"Head GPU worker: {head_gpu_worker}")
print(f"GPU workers: {gpu_workers}")
if io_workers:
    io_workers = [x for x in worker_ids if "IO" in x]
    head_io_worker = io_workers[0]
    gpu_workers_to_io_workers = len(gpu_workers) // len(io_workers)
    gpu_workers_divvied = [[gpu_workers[i], gpu_workers[i+1]] for i in range(0, len(gpu_workers), 2)]
    print(f"Determined head io node: {head_io_worker}")
    print(f"Head IO worker: {head_io_worker}")
    print(f"IO workers: {io_workers}")
    print(f"IO -> GPU worker map: {worker_map}")
    worker_map = {k : v for k, v in zip((io_workers), gpu_workers_divvied)}
client

Total workers: 8
Head GPU worker: tcp://128.55.224.184:33963
GPU workers: ['tcp://128.55.224.184:33963', 'tcp://128.55.224.184:34001', 'tcp://128.55.224.184:35379', 'tcp://128.55.224.184:36785', 'tcp://128.55.224.184:39729', 'tcp://128.55.224.184:39763', 'tcp://128.55.224.184:43695', 'tcp://128.55.224.184:44387']


0,1
Connection method: Scheduler file,Scheduler file: /global/cscratch1/sd/swelborn/scheduler_file.json
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:8787/status,

0,1
Comm: tcp://128.55.224.184:8786,Workers: 8
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:8787/status,Total threads: 8
Started: 5 minutes ago,Total memory: 2.84 TiB

0,1
Comm: tcp://128.55.224.184:33963,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:35907/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:40187,
Local directory: /tmp/dask-worker-space/worker-29z01tcp,Local directory: /tmp/dask-worker-space/worker-29z01tcp
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 438.53 MiB,Spilled bytes: 0 B
Read bytes: 27.49 kiB,Write bytes: 26.99 kiB

0,1
Comm: tcp://128.55.224.184:34001,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:38985/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:41395,
Local directory: /tmp/dask-worker-space/worker-i3annqfe,Local directory: /tmp/dask-worker-space/worker-i3annqfe
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 436.75 MiB,Spilled bytes: 0 B
Read bytes: 25.53 kiB,Write bytes: 15.82 kiB

0,1
Comm: tcp://128.55.224.184:35379,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:34113/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:40945,
Local directory: /tmp/dask-worker-space/worker-yhqkc6r7,Local directory: /tmp/dask-worker-space/worker-yhqkc6r7
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 438.45 MiB,Spilled bytes: 0 B
Read bytes: 26.13 kiB,Write bytes: 25.39 kiB

0,1
Comm: tcp://128.55.224.184:36785,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:39571/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:35361,
Local directory: /tmp/dask-worker-space/worker-q5wtw303,Local directory: /tmp/dask-worker-space/worker-q5wtw303
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 437.87 MiB,Spilled bytes: 0 B
Read bytes: 51.26 kiB,Write bytes: 306.09 kiB

0,1
Comm: tcp://128.55.224.184:39729,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:45275/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:35101,
Local directory: /tmp/dask-worker-space/worker-vw3o35um,Local directory: /tmp/dask-worker-space/worker-vw3o35um
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 438.11 MiB,Spilled bytes: 0 B
Read bytes: 25.52 kiB,Write bytes: 26.68 kiB

0,1
Comm: tcp://128.55.224.184:39763,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:35903/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:38527,
Local directory: /tmp/dask-worker-space/worker-brxf6813,Local directory: /tmp/dask-worker-space/worker-brxf6813
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 6.0%,Last seen: Just now
Memory usage: 438.37 MiB,Spilled bytes: 0 B
Read bytes: 26.93 kiB,Write bytes: 17.24 kiB

0,1
Comm: tcp://128.55.224.184:43695,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:42695/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:41189,
Local directory: /tmp/dask-worker-space/worker-s4gcyw3a,Local directory: /tmp/dask-worker-space/worker-s4gcyw3a
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 438.77 MiB,Spilled bytes: 0 B
Read bytes: 27.49 kiB,Write bytes: 26.98 kiB

0,1
Comm: tcp://128.55.224.184:44387,Total threads: 1
Dashboard: /user/swelborn/cori-configurable-gpu/proxy/128.55.224.184:38011/status,Memory: 363.98 GiB
Nanny: tcp://128.55.224.184:40855,
Local directory: /tmp/dask-worker-space/worker-h7_2bxwk,Local directory: /tmp/dask-worker-space/worker-h7_2bxwk
GPU: Tesla V100-SXM2-16GB,GPU memory: 15.78 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 437.29 MiB,Spilled bytes: 0 B
Read bytes: 25.37 kiB,Write bytes: 15.83 kiB


Extracting some information about the workers

## Import modules

In [4]:
import dask.array as da
import numpy as np
import sys
import os
import time
import stempy.io as stio
import stempy.image as stim
from pathlib import Path
from functools import partial
from time import perf_counter
if on_pm or on_cori_gpu:
    sys.path.append("/global/homes/s/swelborn/utility/")

# Counting (Synchronous)

## Set up loading functions:

In [11]:
def create_reader_on_worker():
    """
    Function to create the reader on worker. The reader is created by submitting this function
    as an actor. Then, the actor can call the get_block_from_image_number method and load in/scatter
    data onto the workers.
    
    """
    
    # Empty gain
    gain0 = None

    # Setup file name and path
    scanNum = 22
    if on_pm and large_data:
        drive = Path("/pscratch/sd/s/swelborn/2022.10.17/") # image 192 is messed up 
        scanNum = 26
    elif on_pm and not large_data:
        drive = Path("/pscratch/sd/s/swelborn/20230103_testing_stempy_reader/")
        scanNum = 22
    elif on_cori_gpu:
        drive = Path("/global/cscratch1/sd/swelborn/2022.03.08/")
        scanNum = 22
    else:
        drive = Path("2022.03.08/")
        scanNum = 22

    # Padded data (?) don't know what this is...
    pad = True
    if pad:
        scanName = 'data_scan{:010}_*.data'.format(scanNum)
    else:
        scanName = 'data_scan{}_*.data'.format(scanNum)
        
    print('Using files in {}'.format(drive))
    print('scan name = {}'.format(scanName))
    
    # Get file names
    files = drive.glob(scanName)
    iFiles = [str(f) for f in files]
    iFiles = sorted(iFiles)
    
    # Create reader
    reader = stio.reader(iFiles, stio.FileVersion.VERSION5, backend="multi-pass")
    reader.create_scan_map()
    return reader

def load_stem_image(reader, image_number=0, with_header=False):
    """
    Given a reader, one can load an image number with this function. 
    
    Args
    ----
    reader: stempy.io.SectorThreadedMultiPassReader
        This assumes that the reader has already created a map.
    image_number: int
        The image number to load (from the map).
    with_header: bool
        Return either a stempy._io._block object (with_header=True), 
        or a numpy array.
    """
    
    block = reader.get_block_from_image_number(image_number).result()
    
    # Depending on whether we want stempy._io._block object, or 
    # np.array object
    if with_header:
        return block
    else:
        return np.array(block, copy=False)

In [12]:
def doubleRoll_dask(image, vec):
    """
    doubleRoll (from ncempy) converted to handle dask arrays.
    """
    return np.roll(np.roll(image, vec[0], axis=0), vec[1], axis=1)

def peakFind2D_dask(image_arr, threshold=0.01):
    """
    peakFind2D (from ncempy) converted to handle dask arrays.
    
    Args
    ----
    image_arr: dask.array
        Has dimensions (x, frame_dimensions[0], frame_dimensions[1])
        where x is the number of images on this particular scan
        position.
        
    Returns
    -------
    positions: tuple(x_pos: da.array, y_pos: da.array)
        Peak positions returned as a tuple of x and y positions. 
    """
    if (threshold < 0) or (threshold >= 1):
        print('Error: Threshold must be 0 <= threshold < 1')
        return 0
    image = image_arr[0]
    # Could generate the set of coordinates using
    # list(set(permutations((0,1,1,-1,-1),2)))
    pLarge = (image > doubleRoll_dask(image, [-1, -1])) & \
             (image > doubleRoll_dask(image, [0, -1])) & \
             (image > doubleRoll_dask(image, [1, -1])) & \
             (image > doubleRoll_dask(image, [-1, 0])) & \
             (image > doubleRoll_dask(image, [1, 0])) & \
             (image > doubleRoll_dask(image, [-1, 1])) & \
             (image > doubleRoll_dask(image, [0, 1])) & \
             (image > doubleRoll_dask(image, [1, 1])) & \
             (image > threshold * np.max(image))
    
    positions = np.nonzero(pLarge * image)
    return positions

## Lazily create arrays

Script for lazy array creation.

A more effective stacking method can be here, but I didn't implement it:
- https://docs.dask.org/en/stable/array-creation.html

In [29]:
# Create a reader actor and scatter it across workers:
reader = client.submit(create_reader_on_worker, actor=True).result()
client.scatter([reader])

# Get some metadata from one of the blocks:
scan_dimensions = (128, 128) # Set manually (for now)
total_images = 10000

# Script to perform lazy operations on everything:
imread = dask.delayed(load_stem_image, pure=True)
lazy_images = [imread(reader, image_number=i, with_header=True) for i in range(total_images)]
sample = lazy_images[0].compute()
frame_dimensions = (sample.data.shape[1], sample.data.shape[2])

arrays = [da.from_delayed(lazy_image.data,           # Construct a small Dask array
                          dtype=sample.data.dtype,   # for every lazy value
                          shape=sample.data.shape)
          for lazy_image in lazy_images]
print(f"Lazily loaded {len(arrays)} stem images.")

Lazily loaded 10000 stem images.


## Run counting

In [30]:
# Persists the arrays in worker memory for fast access
arrays = [array.persist() for array in arrays]
print("Persisted the arrays on the workers.")
# Submits the futures
futures = client.map(peakFind2D_dask, arrays)
print("Submitted the counting algorithm to the cluster.")

Persisted the arrays on the workers.
Submitted the counting algorithm to the cluster.


# Counting (asynchronous)

We would like to do the above asynchronously, such that as the data is read in, it can be taken off of the queue.

To do this, we need to define asynchronous loading/counting functions.

Asynchronous loading:

In [13]:
async def a_load_stem_image(reader, image_number=0):
    """
    Given a reader, one can load an image number with this function. 
    
    Args
    ----
    reader: stempy.io.SectorThreadedMultiPassReader
        This assumes that the reader has already created a map.
    image_number: int
        The image number to load (from the map).
    with_header: bool
        Return either a stempy._io._block object (with_header=True), 
        or a numpy array.
    """
    
    block = reader.get_block_from_image_number(image_number)
    
    # Depending on whether we want stempy._io._block object, or 
    # np.array object
    return block

Asynchronous counting. 

In [14]:
def doubleRoll(image, vec):
    """
    doubleRoll (from ncempy) converted to handle dask arrays.
    """
    return cp.roll(cp.roll(image, vec[0], axis=0), vec[1], axis=1)

async def a_peakFind2D(image_arr, threshold=0.01):
    """
    peakFind2D (from ncempy) converted to handle dask arrays.
    
    Args
    ----
    image_arr: dask.array
        Has dimensions (x, frame_dimensions[0], frame_dimensions[1])
        where x is the number of images on this particular scan
        position.
        
    Returns
    -------
    positions: tuple(x_pos: da.array, y_pos: da.array)
        Peak positions returned as a tuple of x and y positions. 
    """
    if (threshold < 0) or (threshold >= 1):
        print('Error: Threshold must be 0 <= threshold < 1')
        return 0
    image_arr = await image_arr
    image = image_arr.data[0]
    image = cp.asarray(image)
    pLarge = (image > doubleRoll(image, [-1, -1])) & \
             (image > doubleRoll(image, [0, -1])) & \
             (image > doubleRoll(image, [1, -1])) & \
             (image > doubleRoll(image, [-1, 0])) & \
             (image > doubleRoll(image, [1, 0])) & \
             (image > doubleRoll(image, [-1, 1])) & \
             (image > doubleRoll(image, [0, 1])) & \
             (image > doubleRoll(image, [1, 1])) & \
             (image > threshold * cp.max(image))
    non_zero_arr = pLarge * image
    positions = np.nonzero(non_zero_arr)
    return positions

Asynchronous return function (to convert tuple):

In [9]:
import time
t0 = time.perf_counter()
total_images = 100
reader = await client.submit(create_reader_on_worker, actor=True)
arrays = client.map(partial(a_load_stem_image,reader), range(total_images))
b = client.map(a_peakFind2D,arrays)
b = await client.gather(b)
t1 = time.perf_counter()
print(f"Total time for {total_images} images = {t1-t0}s")

Total time for 100 images = 0.7857436020276509s


## Run in batches

In [15]:
def doubleRoll(image, vec):
    """
    doubleRoll (from ncempy) converted to handle dask arrays.
    """
    return cp.roll(cp.roll(image, vec[0], axis=0), vec[1], axis=1)

async def a_peakFind2D(image_arr, threshold=0.01):
    """
    peakFind2D (from ncempy) converted to handle dask arrays.
    
    Args
    ----
    image_arr: dask.array
        Has dimensions (x, frame_dimensions[0], frame_dimensions[1])
        where x is the number of images on this particular scan
        position.
        
    Returns
    -------
    positions: tuple(x_pos: da.array, y_pos: da.array)
        Peak positions returned as a tuple of x and y positions. 
    """
    if (threshold < 0) or (threshold >= 1):
        print('Error: Threshold must be 0 <= threshold < 1')
        return 0
    image_arr = await image_arr
    image = image_arr.data[0]
    image = cp.asarray(image)
    pLarge = (image > doubleRoll(image, [-1, -1])) & \
             (image > doubleRoll(image, [0, -1])) & \
             (image > doubleRoll(image, [1, -1])) & \
             (image > doubleRoll(image, [-1, 0])) & \
             (image > doubleRoll(image, [1, 0])) & \
             (image > doubleRoll(image, [-1, 1])) & \
             (image > doubleRoll(image, [0, 1])) & \
             (image > doubleRoll(image, [1, 1])) & \
             (image > threshold * cp.max(image))
    non_zero_arr = pLarge * image
    positions = np.nonzero(non_zero_arr)
    return positions

async def a_peakFind2D_batched(image_arrs, threshold=0.01):
    """
    peakFind2D (from ncempy) converted to handle dask arrays.
    
    Args
    ----
    image_arr: dask.array
        Has dimensions (x, frame_dimensions[0], frame_dimensions[1])
        where x is the number of images on this particular scan
        position.
        
    Returns
    -------
    positions: tuple(x_pos: da.array, y_pos: da.array)
        Peak positions returned as a tuple of x and y positions. 
    """
    if (threshold < 0) or (threshold >= 1):
        print('Error: Threshold must be 0 <= threshold < 1')
        return 0
    positions = []
    for image in image_arrs:
        pLarge = (image > doubleRoll(image, [-1, -1])) & \
                 (image > doubleRoll(image, [0, -1])) & \
                 (image > doubleRoll(image, [1, -1])) & \
                 (image > doubleRoll(image, [-1, 0])) & \
                 (image > doubleRoll(image, [1, 0])) & \
                 (image > doubleRoll(image, [-1, 1])) & \
                 (image > doubleRoll(image, [0, 1])) & \
                 (image > doubleRoll(image, [1, 1])) & \
                 (image > threshold * cp.max(image))
        non_zero_arr = pLarge * image
        d_pos = cp.nonzero(non_zero_arr)
        positions.append((cp.asnumpy(d_pos[0]), cp.asnumpy(d_pos[1])))
    return positions

async def concatenate_futures(image_arr):
    stack = []
    for image in image_arr:
        image = await image
        stack.append(image.data)
    return np.concatenate(stack)

### Reader should grab the image batch, rather than concatentation later

In [None]:
from distributed import as_completed, wait
import concurrent.futures
total_images = 128 * 128
bytes_per_pattern = 1024 * 1024
max_num_bytes = bytes_per_pattern * 1000 # totally arbitrary right now
batch_size = max_num_bytes // (bytes_per_pattern) # really just ends up being 1000
num_batches = total_images // batch_size + 1
arrays_batch_futures = []
d_arrays_batch_futures = []
counted_batch_futures = []
positions_futures = []
pools = []
div = num_batches // len(worker_ids) + 1
readers = {}
t0 = time.perf_counter()

    
async def a_load_stem_image_batch(reader, image_numbers=(0, 1)):
    """
    Given a reader, one can load an image number with this function. 
    
    Args
    ----
    reader: stempy.io.SectorThreadedMultiPassReader
        This assumes that the reader has already created a map.
    image_number: int
        The image number to load (from the map).
    with_header: bool
        Return either a stempy._io._block object (with_header=True), 
        or a numpy array.
    """
    stack_height = image_numbers[1] - image_numbers[0]
    arr = np.zeros((stack_height, 576, 576)) 
    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        for i, result in enumerate(executor.map(reader.get_block_from_image_number, range(image_numbers[0], image_numbers[1]))):
            arr[i] = (await result).data
    return arr

for worker_id in worker_ids:
    readers[worker_id] = client.submit(create_reader_on_worker, 
                                             actor=True, 
                                             workers=worker_id)
    
for worker_id, batch_idx in zip(worker_ids * div, range(num_batches)):
    probes_remaining = total_images - (batch_idx * batch_size)
    this_batch_size = (
        probes_remaining if probes_remaining < batch_size else batch_size
    )
    first_batch_idx = batch_idx * batch_size
    second_batch_idx = this_batch_size + first_batch_idx
    concat_arr = client.submit(
        partial(a_load_stem_image_batch,await readers[worker_id]), 
        (first_batch_idx, second_batch_idx),
        workers=worker_id,
        )
    # arrays_batch_futures.append(concat_arr)

    d_concat_arr = client.submit(
        cp.asarray, 
        concat_arr,
        workers=worker_id
    )
    arrays_batch_futures.append(d_concat_arr)

    positions = client.submit(
        a_peakFind2D_batched, 
        d_concat_arr,
        workers=worker_id
    )
    # arrays_batch_futures.append(concat_arr)
    # d_arrays_batch_futures.append(d_concat_arr)
    positions_futures.append(positions)
    
await client.gather(positions_futures)
t1 = time.perf_counter()

print(f"Total time: {t1-t0}")
    


### This one creates a ton of tasks due to reading each block individually

In [None]:
# Persists the arrays in worker memory for fast accesstotal_images = 128(
from distributed import as_completed, wait
total_images = 128 * 128
bytes_per_pattern = 1024 * 1024
max_num_bytes = bytes_per_pattern * 1000 # totally arbitrary right now
batch_size = max_num_bytes // (bytes_per_pattern) # really just ends up being 1000
num_batches = total_images // batch_size + 1
arrays_batch_futures = []
d_arrays_batch_futures = []
counted_batch_futures = []
positions_futures = []
pools = []
div = num_batches // len(worker_ids) + 1
readers = {}
t0 = time.perf_counter()

for worker_id in worker_ids:
    readers[worker_id] = await client.submit(create_reader_on_worker, 
                                             actor=True, 
                                             workers=worker_id)
    
for worker_id, batch_idx in zip(worker_ids * div, range(num_batches)):
    probes_remaining = total_images - (batch_idx * batch_size)
    this_batch_size = (
        probes_remaining if probes_remaining < batch_size else batch_size
    )
    first_batch_idx = batch_idx * batch_size
    second_batch_idx = this_batch_size + first_batch_idx
    batched_arrays = client.map(
        partial(a_load_stem_image,readers[worker_id]), 
        range(first_batch_idx, second_batch_idx),
        workers=worker_id,
        )
    concat_arr = client.submit(
        concatenate_futures, 
        batched_arrays, 
        workers=worker_id
    )
    d_concat_arr = client.submit(
        cp.asarray, 
        concat_arr,
        dtype=uint16,
        workers=worker_id
    )
    positions = client.submit(
        a_peakFind2D_batched, 
        d_concat_arr,
        workers=worker_id
    )
    # arrays_batch_futures.append(concat_arr)
    # d_arrays_batch_futures.append(d_concat_arr)
    positions_futures.append(positions)
    
await client.gather(positions_futures)
t1 = time.perf_counter()

print(f"Total time: {t1-t0}")
    


## Batching + py4dstem get_maximal_points

Set up py4dstem kernels:

In [16]:
import cupy as cp

kernels = {}


############################# get_maximal_points ################################

"""
These kernels are approximately 50x faster than the np.roll approach used in the CPU version,
per my testing with 1024x1024 pixels and float64 on a Jetson Xavier NX.
The boundary conditions are slightly different in this version, in that pixels on the edge
of the frame are always false. This simplifies the indexing, and since in the Braggdisk
detection application an edgeBoundary is always applied in the case of subpixel detection,
this is not considered a problem.
"""

maximal_pts_uint16 = r'''
extern "C" __global__
void maximal_pts(const short *ar, bool *out, const double minAbsoluteIntensity, const long long sizex, const long long sizey, const long long N){
    int tid = blockDim.x * blockIdx.x + threadIdx.x;
    int x = tid / sizey;
    int y = tid % sizey;
    bool res = false;
    if (tid < N && x>0 && x<(sizex-1) && y>0 && y<(sizey-1)) {
        float val = ar[tid];

        out[tid] = ( val > ar[tid + sizey]) &&
                    (val > ar[tid - sizey]) &&
                    (val > ar[tid + 1]) &&
                    (val > ar[tid - 1]) &&
                    (val > ar[tid - sizey - 1]) &&
                    (val > ar[tid - sizey + 1]) &&
                    (val > ar[tid + sizey - 1]) &&
                    (val > ar[tid+sizey + 1] &&
                    (val >= minAbsoluteIntensity));
    }
}
'''

kernels['maximal_pts_uint16'] = cp.RawKernel(maximal_pts_uint16,'maximal_pts')


maximal_pts_float32 = r'''
extern "C" __global__
void maximal_pts(const float *ar, bool *out, const double minAbsoluteIntensity, const long long sizex, const long long sizey, const long long N){
    int tid = blockDim.x * blockIdx.x + threadIdx.x;
    int x = tid / sizey;
    int y = tid % sizey;
    bool res = false;
    if (tid < N && x>0 && x<(sizex-1) && y>0 && y<(sizey-1)) {
        float val = ar[tid];

        out[tid] = ( val > ar[tid + sizey]) &&
                    (val > ar[tid - sizey]) &&
                    (val > ar[tid + 1]) &&
                    (val > ar[tid - 1]) &&
                    (val > ar[tid - sizey - 1]) &&
                    (val > ar[tid - sizey + 1]) &&
                    (val > ar[tid + sizey - 1]) &&
                    (val > ar[tid+sizey + 1] &&
                    (val >= minAbsoluteIntensity));
    }
}
'''

kernels['maximal_pts_float32'] = cp.RawKernel(maximal_pts_float32,'maximal_pts')

maximal_pts_float64 = r'''
extern "C" __global__
void maximal_pts(const double *ar, bool *out, const double minAbsoluteIntensity, const long long sizex, const long long sizey, const long long N){
    int tid = blockDim.x * blockIdx.x + threadIdx.x;
    int x = tid / sizey;
    int y = tid % sizey;
    bool res = false;
    if (tid < N && x>0 && x<(sizex-1) && y>0 && y<(sizey-1)) {
        double val = ar[tid];

        out[tid] = ( val > ar[tid + sizey]) &&
                    (val > ar[tid - sizey]) &&
                    (val > ar[tid + 1]) &&
                    (val > ar[tid - 1]) &&
                    (val > ar[tid - sizey - 1]) &&
                    (val > ar[tid - sizey + 1]) &&
                    (val > ar[tid + sizey - 1]) &&
                    (val > ar[tid+sizey + 1] &&
                    (val >= minAbsoluteIntensity));
    }
}
'''

kernels['maximal_pts_float64'] = cp.RawKernel(maximal_pts_float64,'maximal_pts')

Get maxima of block

In [21]:
import time
from dask.distributed import get_worker
from dask.distributed import get_client

def get_maxima_2D(
    ar=None,
    sigma=0,
    edgeBoundary=0,
    minSpacing=60,
    minRelativeIntensity=0.005,
    minAbsoluteIntensity=0.0,
    relativeToPeak=0,
    maxNumPeaks=70,
    subpixel="poly",
    ar_FT=None,
    upsample_factor=16,
    get_maximal_points=None,
    # blocks=None,
    # threads=None,
):
    get_maximal_points = get_maximal_points
    blocks = (ar.shape[1],)
    threads = (ar.shape[2],)
    sizex = ar.shape[1]
    sizey = ar.shape[2]
    N = sizex * sizey
    positions_future = []
    maxima_bool = cp.zeros_like(ar, dtype=bool)
    for i in range(ar.shape[0]):
        # Get maxima
        get_maximal_points(blocks, threads, (ar[i], maxima_bool[i], minAbsoluteIntensity, sizex, sizey, N))
    # Remove edges
        if edgeBoundary > 0:
            maxima_bool[i, :edgeBoundary, :] = False
            maxima_bool[i, -edgeBoundary:, :] = False
            maxima_bool[i, :, :edgeBoundary] = False
            maxima_bool[i, :, -edgeBoundary:] = False
        elif subpixel is True:
            maxima_bool[i, :1, :] = False
            maxima_bool[i, -1:, :] = False
            maxima_bool[i, :, :1] = False
            maxima_bool[i, :, -1:] = False

        # Get indices, sorted by intensity
        maxima_x, maxima_y = cp.nonzero(maxima_bool[i])
        positions_future.append((maxima_x, maxima_y))
#         dtype = cp.dtype([("x", cp.uint16), ("y", cp.uint16), ("intensity", cp.uint16)])
#         maxima = cp.zeros(len(maxima_x), dtype=dtype)
#         maxima["x"] = maxima_x
#         maxima["y"] = maxima_y
#         maxima["intensity"] = ar[i, maxima_x, maxima_y]
#         maxima = cp.sort(maxima, order="intensity")[::-1]

#         positions_future.append((maxima["x"], maxima["y"], maxima["intensity"]))

    return positions_future

async def get_maxima_wrapper(image_number=None, arr=None, get_maximal_points=None, wid=None):
    
    blocks = (arr.shape[1],)
    threads = (arr.shape[2],)
    client = get_client()
    future = client.submit(get_maxima_2D, ar=arr, blocks=blocks, threads=threads, get_maximal_points=get_maximal_points, workers=wid)
    return future

In [22]:
from distributed import as_completed, wait
import distributed
import concurrent.futures
total_images = 128*128
bytes_per_pattern = 1024 * 1024
max_num_bytes = bytes_per_pattern * 100 # totally arbitrary right now
batch_size = max_num_bytes // (bytes_per_pattern) # really just ends up being 1000
num_batches = total_images // batch_size + 1
arrays_batch_futures = []
d_arrays_batch_futures = []
counted_batch_futures = []
positions_futures = []
pools = []
div = num_batches // len(gpu_workers) + 1
readers = {}
t0 = time.perf_counter()

    
async def a_load_stem_image_batch(reader, image_numbers=(0, 1)):
    """
    Given a reader, one can load an image number with this function. 
    
    Args
    ----
    reader: stempy.io.SectorThreadedMultiPassReader
        This assumes that the reader has already created a map.
    image_number: int
        The image number to load (from the map).
    with_header: bool
        Return either a stempy._io._block object (with_header=True), 
        or a numpy array.
    """
    stack_height = image_numbers[1] - image_numbers[0]
    arr = np.zeros((stack_height, 576, 576))
    for image_number in range(image_numbers[0], image_numbers[1]):
        stack_idx = image_number - image_numbers[0]
        if image_number != 192:
            img = (await reader.get_block_from_image_number(image_number)).data
            try:
                arr[stack_idx] = img
            except ValueError:
                continue
    return arr

# workers_info = client.scheduler_info()["workers"]
# worker_ids = []
# for key in workers_info:
#     worker_info = workers_info[key]
#     worker_ids.append(worker_info["id"]) # Gets the worker ID
# gpu_workers = [x for x in worker_ids if "GPU" in x]
# head_gpu_worker = gpu_workers[0] 

for worker_id in gpu_workers:
    readers[worker_id] = client.submit(create_reader_on_worker, 
                                             actor=True, 
                                             workers=worker_id)
# reader = client.submit(create_reader_on_worker, 
#                                              actor=True, 
#                                              workers=head_gpu_worker)
    
for worker_id, batch_idx in zip(gpu_workers * div, range(num_batches)):
    probes_remaining = total_images - (batch_idx * batch_size)
    this_batch_size = (
        probes_remaining if probes_remaining < batch_size else batch_size
    )
    first_batch_idx = batch_idx * batch_size
    second_batch_idx = this_batch_size + first_batch_idx
    concat_arr = client.submit(
        partial(a_load_stem_image_batch, await readers[worker_id]), 
        (first_batch_idx, second_batch_idx),
        workers=worker_id,
        )
    # arrays_batch_futures.append(concat_arr)

    d_concat_arr = client.submit(
        cp.asarray, 
        concat_arr,
        dtype=cp.uint16,
        workers=worker_id
    )
    get_maximal_points=kernels["maximal_pts_uint16"]
    positions = client.submit(get_maxima_2D, ar=d_concat_arr, get_maximal_points=get_maximal_points, workers=worker_id)
    positions_futures.append(positions)
    
# await client.gather(positions_futures)
await wait(positions_futures)
t1 = time.perf_counter()

print(f"Total time: {t1-t0}")

Total time: 45.5324051239877


In [23]:
await positions_futures[0]

[(array([  1,   1,   1, ..., 574, 574, 574]),
  array([  8,  16,  22, ..., 553, 560, 568])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  8,  41,  47, ..., 560, 571, 574])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  1,   8,  18, ..., 557, 560, 574])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  5,  44,  68, ..., 547, 567, 569])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  7,  25,  34, ..., 535, 559, 574])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([ 37,  48,  89, ..., 523, 536, 559])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  8,  24,  41, ..., 565, 567, 573])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  5,  11,  40, ..., 546, 557, 569])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([ 23,  41,  60, ..., 527, 564, 567])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([  7,  17,  24, ..., 559, 565, 567])),
 (array([  1,   1,   1, ..., 574, 574, 574]),
  array([ 11,  27, 110, 

## Reading from head IO node and sending work out to GPU nodes

In [None]:
from distributed import as_completed, wait
import distributed
import concurrent.futures
total_images = 512*512
bytes_per_pattern = 1024 * 1024
max_num_bytes = bytes_per_pattern * 100 # totally arbitrary right now
batch_size = max_num_bytes // (bytes_per_pattern) # really just ends up being 1000
num_batches = total_images // batch_size + 1
arrays_batch_futures = []
d_arrays_batch_futures = []
counted_batch_futures = []
positions_futures = []
pools = []
div = num_batches // len(io_workers) + 1
readers = {}
t0 = time.perf_counter()

    
async def a_load_stem_image_batch(reader, image_numbers=(0, 1), thread_pool=False):
    """
    Given a reader, one can load an image number with this function. 
    
    Args
    ----
    reader: stempy.io.SectorThreadedMultiPassReader
        This assumes that the reader has already created a map.
    image_number: int
        The image number to load (from the map).
    with_header: bool
        Return either a stempy._io._block object (with_header=True), 
        or a numpy array.
    """
    stack_height = image_numbers[1] - image_numbers[0]
    arr = np.zeros((stack_height, 576, 576))
        
    if thread_pool:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            for i, result in enumerate(executor.map(reader.get_block_from_image_number, range(image_numbers[0], image_numbers[1]))):
                arr[i] = (await result).data
                
    else:
        for image_number in range(image_numbers[0], image_numbers[1]):
            stack_idx = image_number - image_numbers[0]
            arr[stack_idx] = (await reader.get_block_from_image_number(image_number)).data
            
    return arr

for io_worker in io_workers:
# Create one reader on the head io node
    readers[io_worker] = await client.submit(create_reader_on_worker, 
                                         actor=True, 
                                         workers=io_worker)
for worker_id, batch_idx in zip(io_workers * div, range(num_batches)):
    probes_remaining = total_images - (batch_idx * batch_size)
    this_batch_size = (
        probes_remaining if probes_remaining < batch_size else batch_size
    )
    first_batch_idx = batch_idx * batch_size
    second_batch_idx = this_batch_size + first_batch_idx
    concat_arr = client.submit(
        partial(a_load_stem_image_batch, readers[worker_id], thread_pool=False), 
        (first_batch_idx, second_batch_idx),
        workers=worker_id,
        )
    d_concat_arr = client.submit(
        cp.asarray, 
        concat_arr,
        dtype=cp.uint16,
        workers=worker_map[worker_id][0]
    )
    get_maximal_points=kernels["maximal_pts_uint16"]
    positions = client.submit(get_maxima_2D, ar=d_concat_arr, get_maximal_points=get_maximal_points, workers=worker_map[worker_id])
    positions_futures.append(positions)
    
# await client.gather(positions_futures)
await wait(positions_futures)
t1 = time.perf_counter()
print(f"Total time: {t1-t0}")

In [13]:
[x.cancel() for x in positions_futures]

[<coroutine object Client._cancel at 0x7fa0dbbe87c0>,
 <coroutine object Client._cancel at 0x7fa0dade25c0>,
 <coroutine object Client._cancel at 0x7fa0dade2640>,
 <coroutine object Client._cancel at 0x7fa0dade26c0>,
 <coroutine object Client._cancel at 0x7fa0dade2740>,
 <coroutine object Client._cancel at 0x7fa0dade27c0>,
 <coroutine object Client._cancel at 0x7fa0dade2840>,
 <coroutine object Client._cancel at 0x7fa0dade28c0>,
 <coroutine object Client._cancel at 0x7fa0dade2940>,
 <coroutine object Client._cancel at 0x7fa0dade2a40>,
 <coroutine object Client._cancel at 0x7fa0dade2ac0>,
 <coroutine object Client._cancel at 0x7fa0dade2b40>,
 <coroutine object Client._cancel at 0x7fa0dade2bc0>,
 <coroutine object Client._cancel at 0x7fa0dade2c40>,
 <coroutine object Client._cancel at 0x7fa0dade2cc0>,
 <coroutine object Client._cancel at 0x7fa0dade2d40>,
 <coroutine object Client._cancel at 0x7fa0dade2dc0>,
 <coroutine object Client._cancel at 0x7fa0dade29c0>,
 <coroutine object Client._c

In [36]:
await positions_futures[-2]

OutOfMemoryError: Out of memory allocating 69,672,960 bytes (allocated so far: 1,086,856,704 bytes).