# JobRunner for distributed computation

This tool enables running the same method over multiple inputs across one or more processes.

In [None]:
from time import time, sleep
from functools import partial
import numpy as np
import ecogdata.parallel.sharedmem as sm
import ecogdata.parallel.jobrunner as jobrunner
from ecogdata.parallel.mproc import parallel_context

## Caveat on fork & spawn multiprocessing in notebooks

**TL;DR** the ``Jobrunner`` tool is Windows-compatible, but very few of these *demonstrations* will run in Windows without moving the new class/method definitions to a separate module.

In Mac and \*nix, processes can be "forked" at a point in runtime, cloning memory and process state. This is very fast and compatible with notebook usage. In Windows, new processes can only "spawn". The process state is recovered through pickling/unpickling the parent process. This is not entirely compatible with notebook usage, since any methods/classes defined within this same script cannot be recovered. To operate in spawn mode, any of the method/class definitions below would need to be defined in a separate Python file and imported to this notebook.

In [None]:
try:
    # Will NOT work in Windows
    parallel_context.ctx = 'fork'
except:
    pass

## Defining parallel workers

The most simple `ParallelWorker` specifies a method to map arguments over, and a bit of logic to take a job specification and turn it into arguments for the method call.

Here is an example using the numpy variance method.

In [None]:
class ArrayVar(jobrunner.ParallelWorker):
    para_method = staticmethod(np.var)
    
    def map_job(self, job):
        """
        Create arguments and keywords to call self.para_method(*args, **kwargs)
        
        "job" is of the form (i, job_spec) where i is a place keeper.
        
        """
        i, arr = job
        # Do some helpful logging
        info = parallel_context.get_logger().info
        info('Got job {}'.format(i))
        return i, arr, dict()

Here are the salient ingredients

* `para_method` is decorated as a `staticmethod` (**required**)
* `map_job` is overloaded to construct calling arguments for `para_method`
* the place-keeper variable `i` is kept alongside the calling arguments.

Decorating the method as a `staticmethod` is required to avoid "binding" the method to the worker object. The use of a place-keeper index is necessary because the worker processes are asynchronous and may return values out of order. 

Now create a `JobRunner` to run these workers. **Note: example of notebook code that will hang using "spawn."**

In [None]:
plain_arrays = [np.random.rand(100000) for _ in range(25)]
jobs = jobrunner.JobRunner(ArrayVar)
# For a lot of detail, run with loglevel='info'
res = jobs.run_jobs(inputs=plain_arrays, loglevel='error')
print(res[:5])

For **very** simple parallel dispatch that splits inputs that can pass directly to a method's call signature, you can use this construction. (You can also turn off the progress bar.) **Note: example of notebook code that will run using "spawn," since everything is imported from external sources.**

In [None]:
jobs = jobrunner.JobRunner(np.var)
res = jobs.run_jobs(inputs=plain_arrays, loglevel='error', progress=False)
print(res[:5])

You can also get tricky using `partial` here. The jobbed-out method is now variance computed only on the 2nd axis (`axis=1`) and each result has 5 dims.

In [None]:
plain_arrays = [np.random.rand(5, 100000) for _ in range(25)]
jobs = jobrunner.JobRunner(partial(np.var, axis=1))
res = jobs.run_jobs(inputs=plain_arrays, loglevel='error', progress=False)
print(res[:5])

The only required argument is the name of the `ParallelWorker` subclass. Call `run_jobs` with the inputs to distribute. If the output is an expected numerical type, the `output_shape` and `output_dtype` can be specified. By default, the number of workers is set to `mp.cpu_count()`.

General return types are supported. By default, the output of `run_jobs` is an "object array" with `dtype=np.object`, (but simple job returns can be turned back into numerical ndarrays). Here is a general example when returning variable sized lists. (This takes advantage of the default `map_job` behavior, which just distributes the job sequence to the method.)

In [None]:
def rand_list(n):
    # Return a list of random numbers.
    # The length is equal to the job number (n).
    return np.random.rand(n).tolist()


class RanListWorker(jobrunner.ParallelWorker):
    para_method = staticmethod(rand_list)
    

jobs = jobrunner.JobRunner(RanListWorker)
# Call with n_jobs instead of inputs
res = jobs.run_jobs(n_jobs=10, loglevel='error', progress=False)

In [None]:
for r in res:
    print(type(r), 'len', len(r))

### Submitting jobs on the fly

In some cases it might be useful to submit jobs sequentially. The input batch can be assembled piecemeal using the ``submitting_jobs`` context. Currently, all jobs are run *after* leaving the context, and not at the time of submission.

In [None]:
jobs = jobrunner.JobRunner(np.var)
# provide the "run_jobs" arguments here
with jobs.submitting_jobs(progress=True):
    for _ in range(12):
        n = np.random.randint(low=100, high=1000)
        print('Adding len-{} random list'.format(n))
        jobs.submit(np.random.rand(n))
# jobs run now
res = jobs.output_from_submitted
print(res[:5])

## Progress bar trick for nested loops

*Note that the progress bar is not visible in the HTML doc. To see this demonstration, run this notebook in Jupyter.*

If you want a progress bar inside an outer loop, then create one yourself and specify `leave=False`.

In [None]:
from tqdm.auto import trange, tqdm

In [None]:
def sleepy(*args):
    sleep(0.15)


jobs = jobrunner.JobRunner(sleepy)
for i in trange(4, desc='Running multiple stuffs'):
    pbar = tqdm(desc='Run {} progress'.format(i + 1), total=100, leave=False)
    jobs.run_jobs(n_jobs=100, loglevel='error', progress=pbar)

## Dealing with large arrays

Distributing bigger datasets to workers is slowed down by serialization.

In [None]:
plain_arrays = [np.random.rand(10000000) for _ in range(15)]
print('Size in MB:', plain_arrays[0].size * plain_arrays[0].dtype.itemsize / 1024 / 1000)

In [None]:
tic = time()
jobs = jobrunner.JobRunner(ArrayVar)
res = jobs.run_jobs(inputs=plain_arrays, loglevel='error')
toc = time()
print(res[:5], '({:.2f} seconds)'.format(toc - tic))

Instead, shared memory can be accessed by each process. The `SharedmemManager` holds a `sharedctypes` array that can present itself as a numpy ndarray. Make a shared copy of the previous arrays (this incurs a bit of overhead). Skip the acquisition lock, since each array will be accessed in only one processs.

**NOTE:** it is a good idea to use reference shared memory objects from the sharedmem module (e.g `sm.shared_copy`), because these lookups will change based on the parallel context. Alternatively, the objects change with context (e.g. ``parallel_context.shared_copy``).

In [None]:
tic = time()
shm_arrays = [sm.SharedmemManager(sm.shared_copy(a), use_lock=False) for a in plain_arrays]
toc = time()
print('Array creation: {:.2f} seconds'.format(toc - tic))

The shared memory cannot be distributed in the same way as other objects, but they can be passed to the workers at instatiation (prior to running jobs). 

This worker modifies the `ArrayVar` class. It has a pointer to the list of all memory managers. The job mapping takes the job number to index that list and convert it to a numpy array.

In [None]:
import multiprocessing as mp

class SharedarrayVar(ArrayVar):
    
    def __init__(self, shm_managers):
        self.shm_managers = shm_managers
    
    def map_job(self, job):
        # job is only the job number
        i = job
        info = parallel_context.get_logger().info
        info('Got job {}'.format(i))
        # use the get_ndarray() context manager to simply get the array
        with self.shm_managers[i].get_ndarray() as arr:
            pass
        return i, (arr,), dict()

Now the `JobRunner` needs arguments to instantiate the worker: use `w_args=(shm_arrays,)`. Call `run_jobs` with the number of jobs, rather than inputs.

In [None]:
tic = time()
jobs2 = jobrunner.JobRunner(SharedarrayVar, w_args=(shm_arrays,))
res2 = jobs2.run_jobs(n_jobs=len(shm_arrays), loglevel='error')
toc = time()
print(res2[:5], '({:.2f} seconds)'.format(toc - tic))

In [None]:
res2 == res

## Handling errors

Unexpected errors can pop up in the workers. Here's a cranky function to demonstrate.

In [None]:
def hates_eights(n):
    if n == 8:
        raise ValueError("n == 8, what did you think would happen?!")
    return n

Typically, it's good not to let a problem blow up the whole job. To do this, catch and supress exceptions in the JobRunner. You can still see whether there was a problem by logging errors.

In [None]:
jobs = jobrunner.JobRunner(hates_eights)
results = jobs.run_jobs(np.arange(10), reraise_exceptions=False, loglevel='error')

The return value when there is an error is Not-A-Number (nan).

In [None]:
results

A better idea is to get the results AND the exceptions, and re-raise exceptions after the fact.

In [None]:
jobs = jobrunner.JobRunner(hates_eights)
results, exceptions = jobs.run_jobs(np.arange(10), reraise_exceptions=False, return_exceptions=True, loglevel='error')

All exceptions are returned as (exception-type, exception-instance, traceback).

In [None]:
exceptions

To re-raise:

```python
raise e[1].with_traceback(e[2])
```

### Debugging worker errors

The catch-and-suppress techniques do not allow going into the call stack with a debugger. Instead, you can run in single-threaded mode and reraise exceptions immediately. If an error is thrown in this case, you can do a debugger post-mortem to get into the stack.

In [None]:
jobs = jobrunner.JobRunner(hates_eights, n_workers=1, single_job_in_thread=False)
results = jobs.run_jobs(np.arange(10), reraise_exceptions=True, loglevel='error')