# Tutorial about mutiprocessing using ray

We will describe how to set up an analysis pipeline to process multiple datasets in parallel using the framework [ray](https://ray.io/).

In [None]:
import sys
import logging

%matplotlib inline

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ray

import locan as lc

In [None]:
lc.show_versions(dependencies=False, verbose=False)

## Activate logging

In [None]:
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

For changing the configuration logging has to be reloaded or the kernel be restarted.

## Synthetic data

Simulate 3 datasets of localization data that is homogeneously Poisson distributed and treat them as files.

In [None]:
rng = np.random.default_rng(seed=1)

In [None]:
locdatas = [lc.simulate_Poisson(intensity=1e-3, region=((0,1000), (0,1000)), seed=rng) for _ in range(3)]
files = locdatas

print("Element_counts:", [locdata.meta.element_count for locdata in locdatas])

## Analysis pipeline

Define an analysis pipeline. Typically a pipeline processes a single file, which in this example will be a an element of locdatas.

Within the analysis procedure there will be more random number generation involved. Therefore a correctly generated seed has to be passed.

In [None]:
def computation(self, file, seed):
    logging.basicConfig(level=logging.INFO)
    logger.info(f'computation started for file: {file}')
    
    rng = np.random.default_rng(seed=seed)
    
    other_locdata = lc.simulate_Poisson(intensity=1e-3, region=((0,1000), (0,1000)), seed=rng)
    self.nn = lc.NearestNeighborDistances().compute(locdata=file, other_locdata=other_locdata)
        
    return self

## Run analysis in parallel

In [None]:
ray.init()
# ray.init(num_cpus = 4)

In [None]:
%%time
@ray.remote
def worker(file, seed):
    pipe = lc.Pipeline(computation=computation, file=file, seed=seed).compute()
    return pipe

n_processes = len(files)
ss = np.random.SeedSequence()
child_seeds = ss.spawn(n_processes)

futures = [worker.remote(file=file, seed=seed) for file, seed in zip(locdatas, child_seeds)]
pipes = ray.get(futures)

## Visualize the combined results

In [None]:
[pipe.meta for pipe in pipes]

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=1)
for pipe in pipes:
    pipe.nn.hist(ax=ax)
plt.show()