# Annnotated celery examples
This document shows both basic and more real-world examples of celery usage.

Relevant pages from celery's documentation
https://docs.celeryproject.org/en/stable/userguide/calling.html
https://docs.celeryproject.org/en/stable/userguide/canvas.html

## Asynchronous (i.e. remote and non-blocking) invocation of tasks

`orca.proj.boilerplate` is where the celery-ized tasks live. This section uses add(x, y) that adds two numbers to illustrate celery task invocation

In [1]:
import glob
from orca.proj import boilerplate

Start with a basic one, add numbers! `group` and `chain` are celery primitives that model a group of tasks or a chain of tasks (i.e. one depending on the previous one)

In [2]:
from celery import group, chain

In [30]:
type(boilerplate.add)

celery.local.PromiseProxy

In [26]:
# This runs on the current node
boilerplate.add(1, 1)

2

The quickiest way to run a task on a worker is by using the `delay()` method. Note that you can assign the `AsyncResult` to a variable and then use its `get()` method to get its results.

In [33]:
# So does this.
boilerplate.add.delay(1, 1)

<AsyncResult: 4e7eba9a-619c-40dc-9de1-5be7a7dbb4f7>

## Embarassingly parallel operations: group

To add lots of pairs of numbers (an embarassingly parallel operation) you can call

In [32]:
group([boilerplate.add.s(a, a) for a  in [1,2,3,4,5,6,7]])()

<GroupResult: bbba6cb9-7893-40de-ae91-65663f4e8d4f [5a125dea-9711-4860-bbeb-c2214e2a2e7a, fab2c2a7-7f37-4fa5-891c-7d52f055be29, e93e844e-92c3-4409-9462-df6728238690, f26d48d6-1f5f-4077-88ad-5577eec63149, b8802639-5c51-4c5e-a500-684df55096d4, 5bd224f7-e89c-4974-9ffe-2f897f8fd202, 8cd49e4f-fbf1-4a65-862a-659f39f87ee9]>

Couple things happened there. `.s` is the method that creates the signatures for celery to execute, rather than executing the commands right away. `group()` creates the group of signatures (each can be done in parallel), and calling `()` on the `group` sends the group to the queue.

You can write a `for` loop and invoke `delay()` one by one to achieve the same thing as `group`, but `group` makes keeping track of the results easier.

In [36]:
# Sidenote: this runs on the current node
boilerplate.add.s(1, 1)()

2

In [37]:
# Sidenote2: this runs on the a worker and is equivalent to boilerplate.add.s.delay(1,1)
boilerplate.add.s(1, 1).delay()

<AsyncResult: 8f7f5321-571c-49b0-bdeb-55cd43940058>

# Things that depend on each other: chain

The following executes one `add` at a time (not necessarily on the same worker!). Only the result of the last task gets returned


## Example 1

`si` stands for "immutable signature", which means "don't feed the previous call's results into me". The following returns 3

In [39]:
chain(boilerplate.add.si(0,1), boilerplate.add.si(1,1), boilerplate.add.si(1,2))()

<AsyncResult: bd071377-c853-42e8-bb11-49c72b793039>

## Example 2
Using `.s()` means that the previous task can feed its return value into the next method.
For the following code, add(0,1) will be evaluated first, and then add(2, 1) will be evaluated next (note the order of arguments). The chain returns 3.

In [40]:
chain(boilerplate.add.s(0,1), boilerplate.add.s(2))()

<AsyncResult: 9274b9ad-8c5c-40c4-bbc1-a6f23061ab13>

You can use the pipe operator to construct chains (much like bash)

In [42]:
(boilerplate.add.s(0,1) | boilerplate.add.s(2))()

<AsyncResult: 12dff6f1-c13b-4504-8494-0a78fc048034>

# A recent example

The following code merges flags and correct scaling for sidereally separated pairs of measurement sets. And then integrate them.

In [3]:
DATA_DIR = '/lustre/yuping/0-100-hr-reduction/averaging-test/hh=03'

In [23]:
day1 = sorted(glob.glob(f'{DATA_DIR}/2018-03-22/2018*T*'))
day2 = sorted(glob.glob(f'{DATA_DIR}/2018-03-23/2018*T*'))

This is about 40 minutes of data

In [24]:
len(day1)

189

In [25]:
len(day2)

189

Verify that the same indexed elements in the two arrays are one sidereal day apart

In [10]:
day1[-1]

'/lustre/yuping/0-100-hr-reduction/averaging-test/hh=03/2018-03-22/2018-03-22T03:50:49'

In [11]:
day2[-1]

'/lustre/yuping/0-100-hr-reduction/averaging-test/hh=03/2018-03-23/2018-03-23T03:46:53'

The following queues a bunch of chains that merge the flags first and then do the gain scaling. I started a bunch of workers with concurrency 10 for this.

In [13]:
for i in range(len(day1)):
    for spw in range(22):
        ms1 = glob.glob(f'{day1[i]}/{spw:02d}_*ms')[0]
        ms2 = glob.glob(f'{day2[i]}/{spw:02d}_*ms')[0]
        (boilerplate.run_merge_flags.si(ms1, ms2) | boilerplate.run_correct_scaling.si(ms1, ms2))()

The following queues a bunch of concat runs. I changed the concurrency to 4 (see next section) before running this so that 4 concat runs on each node.

In [19]:
for spw in range(22):
    s = f'{spw:02d}'
    boilerplate.run_integrate_with_concat.delay(sorted(glob.glob(f'{DATA_DIR}/2018-03-22/2018*T*/{s}_*ms')),
                                            out_ms=f'{DATA_DIR}/2018-03-22/{s}_10min.ms')
    boilerplate.run_integrate_with_concat.delay(sorted(glob.glob(f'{DATA_DIR}/2018-03-23/2018*T*/{s}_*ms')),
                                            out_ms=f'{DATA_DIR}/2018-03-23/{s}_10min.ms')

# Worker control (still a work in progress)
Changing worker capacity, etc.

You should probably use the `start_workers.py` script to start workers across the cluster. The following commands show how you would change the concurrency for the workers (each node currently has one worker and each worker runs `concurrency` tasks in parallel via subprocessing.

In [14]:
from orca.proj.celery import app
from celery.app.control import Control
controller = Control(app)

You can increase the concurrency per node using `pool_grow(x)` where `x` is the number of concurrencies to add to each node

In [35]:
controller.pool_grow(7)

Shrinking capacity is still a work in progress. You can only use the following command when workers aren't doing work.

In [18]:
controller.pool_shrink(7)