# Multiprocessing on GPU

This is an extreme case where we will try how to use all resources of the computer 

In [1]:
import os
import glob
import concurrent
import time
import multiprocessing
from multiprocessing import shared_memory, Process, Value
multiprocessing.set_start_method('spawn')
mpctx = multiprocessing.get_context('spawn')
import numpy
import hdf5plugin
import h5py
import pyFAI

from silx.opencl import ocl
from silx.opencl.codec.bitshuffle_lz4 import BitshuffleLz4
import inspect
from worker import Item
MAIN_PROCESS = multiprocessing.parent_process() is None

print(ocl)

OpenCL devices:
[0] NVIDIA CUDA: (0,0) NVIDIA RTX A5000, (0,1) Quadro M2000
[1] Intel(R) OpenCL: (1,0) Intel(R) Xeon(R) CPU E5-1650 v4 @ 3.60GHz
[2] Intel(R) OpenCL: (2,0) Intel(R) Xeon(R) CPU E5-1650 v4 @ 3.60GHz
[3] Intel(R) FPGA Emulation Platform for OpenCL(TM): (3,0) Intel(R) FPGA Emulation Device


In [2]:
import param
print(inspect.getsource(param))

#This cell contains the parameters for all the processing
DEVICES= [(0,0),(0,1)]
NWORKERS = 4
FRAME_PER_FILE = 100
NFILES = 100
NBINS = 1000
SHARED_NAME = 'shared_results'
DETECTOR="Eiger_4M"
pathname = "/tmp/big_%04d.h5"
pathmask = "/tmp/big_????.h5"
dtype = "float32"
array_shape = (NFILES, FRAME_PER_FILE, NBINS)



In [3]:
from worker import build_integrator
print(inspect.getsource(build_integrator))

def build_integrator(detector=DETECTOR):
    geo = {"detector": detector, 
           "wavelength": 1e-10, 
           "rot3":0} #work around a bug https://github.com/silx-kit/pyFAI/pull/1749
    ai = pyFAI.load(geo)
    return ai



In [4]:
# Generate a set of files
   
def generate_one_frame(ai, unit="q_nm^-1", dtype="uint32"):
    """Prepare a frame with little count so that it compresses well"""
    qmax = ai.array_from_unit(unit=unit).max()
    q = numpy.linspace(0, qmax, 100)
    img = ai.calcfrom1d(q, 100/(1+q*q))
    frame = numpy.random.poisson(img).astype(dtype)
    return frame

def generate_files(img):
    cmp = hdf5plugin.Bitshuffle()
    filename = pathname%0
    shape = img.shape
    with h5py.File(filename, "w") as h:
        ds = h.create_dataset("data", shape=(FRAME_PER_FILE,)+shape, chunks=(1,)+shape, dtype=img.dtype, **cmp) 
        for i in range(FRAME_PER_FILE):
            ds[i] = img + i%500 #Each frame has a different value to prevent caching effects
    res = [filename]
    for i in range(1, NFILES):
        new_file = pathname%i
        os.link(filename,new_file)
        res.append(new_file)
    return res

# Create a set of files with dummy data in them:
if not glob.glob(param.pathmask): 
    input_files = generate_files(generate_one_frame(build_integrator(DETECTOR)))
else:
    input_files = glob.glob(param.pathmask)
    input_files.sort()

In [5]:
#This is allows to create and destroy shared numpy arrays
from worker import create_shared_array
print(inspect.getsource(create_shared_array))

def release_shared(name=param.SHARED_NAME):
    shm = shared_memory.SharedMemory(name=name)
    shm.close()
    shm.unlink()  # Free and release the shared memory block

if MAIN_PROCESS:
    result_array = create_shared_array(param.array_shape, param.dtype, param.SHARED_NAME, create=MAIN_PROCESS)

def create_shared_array(shape, dtype="float32", name=SHARED_NAME, create=False):
    d_size = numpy.prod(shape)*numpy.dtype(dtype).itemsize
    shm = shared_memory.SharedMemory(create=create, size=d_size, name=SHARED_NAME)
    # numpy array on shared memory buffer
    dst = numpy.ndarray(shape=shape, dtype=dtype, buffer=shm.buf)
    return dst



In [6]:
from worker import worker
print(inspect.getsource(worker))

def worker(rank, queue, shm_name, counter):
    """Function representing one worker, used in a pool of worker.
    
    :param rank: integer, index of the worker.
    :param queue: input queue, expects Item with index and name of the file to process
    :param shm_name: name of the output shared memory to put integrated intensities
    :param counter: decremented when quits
    :return: nothing, used in a process.
    """
    #Start up the integrator:
    ai = build_integrator(DETECTOR)
    blank = numpy.zeros(ai.detector.shape, dtype="uint32")
    method = ("full", "csr", "opencl", DEVICES[rank%len(DEVICES)])
    res = ai.integrate1d(blank, NBINS, method=method)
    omega = ai.solidAngleArray()
    engine = ai.engines[res.method].engine
    print(res.method)
    omega_crc = engine.on_device["solidangle"]
    engine = new_engine(engine, 512)
   
    gpu_decompressor = BitshuffleLz4(2000000, blank.size, dtype=blank.dtype, ctx=engine.ctx)
    gpu_decompressor.block_size = 128
    result_

In [7]:
def build_pool(nbprocess, queue, shm_name, counter):
    """Build a pool of processes with workers, and starts them"""
    pool = [Process(target=worker, name=f"worker_{i:02d}", args=(i, queue, shm_name, counter)) for i in range(nbprocess)]
    for process in pool:
        process.start()
    return pool

def end_pool(pool, queue):
    """Ends all processes from a pool by sending them a "kill-pill"""
    for process in pool:
        queue.put(Item(-1, None))


In [8]:
# Build the pool of workers
queue = mpctx.Queue()
counter = mpctx.Value("i", param.NWORKERS)
pool = [Process(target=worker, name=f"worker_{i:02d}", args=(i, queue, param.SHARED_NAME, counter)) for i in range(param.NWORKERS)]
# pool=build_pool(NWORKERS, queue, SHARED_NAME, counter)
# for idx, fn in enumerate(input_files):
#     queue.put(Item(idx, fn))
# for i in range(NWORKERS):
#     queue.put(Item(-1, None))

In [9]:
if MAIN_PROCESS:
    for p in pool: p.start()

In [10]:
if MAIN_PROCESS:
    for idx, fn in enumerate(input_files):
        queue.put(Item(idx, fn))

In [11]:
for i in range(param.NWORKERS):
    queue.put(Item(-1, None))

In [12]:
#release_shared(SHARED_NAME)

In [14]:
counter.value

4

In [12]:
queue.qsize()

0

In [None]:
p=pool[0]

In [None]:
p.start()

In [None]:
w = worker(0, queue, SHARED_NAME, counter)

q_nm^-1


In [14]:
import pickle
pickle.dumps(lambda x:2*x)

PicklingError: Can't pickle <function <lambda> at 0x7fa2b5f7b670>: attribute lookup <lambda> on __main__ failed