Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery runner #235

Merged
merged 48 commits into from Jun 6, 2017
Merged

Celery runner #235

merged 48 commits into from Jun 6, 2017

Conversation

Kirill888
Copy link
Member

@Kirill888 Kirill888 commented May 15, 2017

Overview

New executor that uses Celery (with Redis as broker and data backend).

This provides an alternative to current setup (dask.distributed). Problem with using dask.distributed is that it requires that tasks are idempotent, since it will sometimes schedule the same task in parallel on different nodes. With many tasks doing I/O this creates problems.

Celery in comparison has a much simpler execution model, and doesn't have same constraints.

Redis backend

Celery supports a number of backends, of them two are fully supported: RabbitMQ and Redis. I have picked Redis as it is the simplest to get running without root access (NCI environment)

data_task_options

Adding celery option to --executor command line, same host:port argument is used. Celery executor will connect to Redis instance at a given address, if address is localhost and Redis is not running, it will be launched for the duration of the execution. Workers don't get launched however, so in most cases the app will stall until workers are added to the processing pool (see datacube-worker)

$HOME/.datacube-redis contains redis password, if this file doesn't exist it will be created with a randomly generated password when launching Redis server.

Also adding executor alias dask to be the same as distributed. However now that we have 2 distributed backends we should probably favour dask as a name for dask.distributed backend.

datacube-worker

New app datacube-worker was added to support launching workers in either celery or dask mode. It accepts the same --executor option as the task app.

@coveralls
Copy link

Coverage Status

Coverage decreased (-1.6%) to 79.787% when pulling 5464640 on celery_runner into cdff103 on develop.

@coveralls
Copy link

Coverage Status

Coverage decreased (-1.6%) to 79.787% when pulling 7b41e17 on celery_runner into cdff103 on develop.

Convenience method for interactive development/debugging.
Sample app that uses executor to run tasks in parallel
Currently one can not set redis address on a command line, but it can be set via
environment variable `REDIS=redis://<host>:>port>/<db>`
Redis server IP/port can be configured from command line now.
Also shutdowns celery workers and redis when done, including redis-server as
part of the environment.
When not supplied on a command line should be use one from the config, instead
it was setting to None.
`datacube-worker` accepts the same `--executor` option as task apps and launches
appropriate worker task: `celery::redis` or `dask::distributed` based worker.
This one launches worker and app on a local machine only
Allowing remote connections to redis without password for testing, for now
when launching `datacube-worker` allow `dask` as an alias for `distributed`
Also adding shell script for launching on pbs
2 spaces between functions
- gen_password
- slurp content of a file
- write_user_secret_file function to create files that are readable by user only
By default now redis server will be launched with a randomly generated password
that will be stored in user home directory in a text file only user has
permissions to read. Celery workers load password from that file, and so need to
have access to the home directory (ok on NCI) but also need to be launched after
the file was created.

Password is generated once and then re-used unless user manually deletes the
file where it is stored, `.datacube-redis` at the moment.
in this case IOError should be enough
removed `params` options, just use `key=value` syntax. For redis config options
that have `-` in the name replace it with `_` to fit python syntax constraints.
also updated setup.py to include celery+redis dependency.
conda doesn't have celery 4 yet.
@coveralls
Copy link

Coverage Status

Coverage decreased (-1.6%) to 79.787% when pulling 76feeaa on celery_runner into cdff103 on develop.

@Kirill888 Kirill888 changed the title [almost ready] Celery runner Celery runner May 16, 2017
@coveralls
Copy link

Coverage Status

Coverage decreased (-1.6%) to 79.787% when pulling 61d913b on celery_runner into 4bfdced on develop.

# If no change detected sleep for a bit
# TODO: this is sub-optimal, not sure what other options are
# though?
sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a shame that celery.ResultSet doesn't have a collect() function like AsyncResult http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect

}

EXECUTOR_TYPES['dask'] = EXECUTOR_TYPES['distributed'] # Add alias "dask" for distributed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could make things more confusing, as dask also has synchronous, multi-threaded and multi-process schedulers/executors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but 'distributed' is too generic also as celery is also "distributed" across machines, and 'dask[.-_]distributed' is hard to type and to remember separator token.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

def wrap_task(f, *args, **kwargs):
'turn function `f(task, *args, **kwargs)` into `g(task)` in pickle-able fashion'
return functools.partial(_wrap_impl, f, args, kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should aim for PEP 257 with docstrings - Triple quotes are used even though the string fits on one line. This makes it easy to later expand it.

addressing code review, also deleted unused import
chartostring now returns decoded string always, so numpy.char.decode step is no
longer needed. Code should still work with older lib.
Two things changed in 1.2.8
- chartostring
- cython version dependency

cython thing somehow broke compliance checkers, it can not load any checkers as
they fail with

```
DistributionNotFound: The 'cython>=0.19' distribution was not found and is required by netCDF4
```
in `load_all_available_checkers`, so no checkers are loaded which then asserts
in `run` method.
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.6%) to 81.387% when pulling 0ce2711 on celery_runner into ee53c48 on develop.

Adding new function `netcdf_extract_string` that takes care of possible ways
strings can be stored in netcdf. This fixes test failures when using 1.3.8
version that switched to returning unicode from `chartostring` method.
cython is a compulsory dependency of netcdf4 starting from 1.2.8, but since it's
a build-time dependency conda doesn't install it, netdcf4 is compiled
externally. Lack of cython doesn't break netcdf4 library, but it does break
compliance checker, because it uses `pkg_resource` lib to check if dependencies
for plugins are installed and `pkg_resource` reports that cython is needed to
import `netcdf4` which isn't true.
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.6%) to 81.383% when pulling 5f0fa4b on celery_runner into ee53c48 on develop.

@woodcockr
Copy link
Member

Hi Folks,
I've been watching this branch with interest. Looks good, I like the switching of different execution models. When you are ready would love a chat/doc about how this design all comes together and where you might be heading next to help with some of our future contributions.
Thanks for the effort!

@Kirill888
Copy link
Member Author

Hi Rob,

thank you for feedback. In a very short term we are hoping to have this branch merged in it's current state. Changes are backwards compatible with processing apps we currently have and we will likely leave them be as they are for now. Longer term I think we can all agree that writing "task apps" can be a lot simpler, and you can consider work in this branch as an exploration of pain points. I do not claim to address any of them just yet.

Ideally I'd like us to move away from writing custom launch scripts with hard-coded environment setup in bash and using qsub on a command-line, I think all this logic can be done in python and re-used by all processing apps with an import and a decorator.

I want to be able to write processing app that can be easily tested locally, and then without writing any scripts deployed on NCI, or any other compute provider we might support in the future.

  source activate my-env
  # run locally to test
  ./my-processing-app.py --my-app-option1 --my-option2=small-dataset

  # run on a raijin using qsub under the hood
  ./my-processing-app.py --pbs="num-nodes=10,time=1h" --my-app-option1 --my-option2=large-dataset

  # run on a AWS using dockers or whatever 
  ./my-processing-app.py --aws="num-nodes=10,region=oregon" --my-app-option1 --my-option2=large-dataset

I will be away in June and everyone else have their plate full with higher priority tasks as it is, so I do not expect any more work to be done in that direction for a while.

also removed netcdf4 peg in `setup.py`, should have done it in the last commit.
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 81.851% when pulling 30bcee5 on celery_runner into ee53c48 on develop.

@woodcockr
Copy link
Member

Great. That is exactly we wanted to achieve with the Execution Engine - write your app, run it on any of the target compute resource environments. preferably with it figuring out the best way to divide up the tasking.

I've made a note in our planning to link up with this once we clear the current development tasks.

somewhat higher coverage for executor classes.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 82.123% when pulling 543043f on celery_runner into ee53c48 on develop.

running out of disk on travis test machines, see if this helps
To be feature compatible with dask, celery and serial executor. Now you can
submit lambda's and inner functions not just top-level functions.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 82.146% when pulling a9119f6 on celery_runner into ee53c48 on develop.

Fixing `ResourceWarning` (file handle leak) in `read_documents`
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 82.136% when pulling 7d95ae8 on celery_runner into ee53c48 on develop.

Still running out of disk on travis, conda clean should happen after setting up
agdc environment.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 82.136% when pulling 1f54ee6 on celery_runner into ee53c48 on develop.

Fixing pylint complaints, also minor flake8 formatting corrections.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 82.136% when pulling eeda1f6 on celery_runner into ee53c48 on develop.

Copy link
Member

@omad omad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good enough to merge. Since Kirril is away for the rest of June I'm not going to let this wait any longer.

@omad omad merged commit 12cbe5a into develop Jun 6, 2017
@Kirill888 Kirill888 deleted the celery_runner branch July 12, 2017 03:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants