New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Celery runner #235
Celery runner #235
Changes from 36 commits
3541ef7
ea75bfe
c8c096d
93fa39c
f7be8e5
12e49a7
2a3d63a
a0c3a98
6a5d492
236bc1d
65b1291
58ab0ea
c1786eb
7135e54
cf75f47
71a22fa
62e1d1c
253b0ea
feb0885
259d0a4
e1984c7
6a20027
76feeaa
61d913b
3da1b6f
059497a
15ed48c
30d9dcb
56e384d
92af6c3
c8780e6
9a63d7f
f13aaf2
29d43ce
7534870
b07e744
3e10d2a
2e1c54e
0ce2711
bb7c6f8
5f0fa4b
30bcee5
543043f
d02ba07
a9119f6
7d95ae8
1f54ee6
eeda1f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,259 @@ | ||
from __future__ import print_function | ||
from __future__ import absolute_import | ||
|
||
from celery import Celery | ||
from time import sleep | ||
import redis | ||
import os | ||
|
||
# This can be changed via environment variable `REDIS` | ||
REDIS_URL = 'redis://localhost:6379/0' | ||
|
||
|
||
def mk_celery_app(addr=None): | ||
|
||
if addr is None: | ||
url = os.environ.get('REDIS', REDIS_URL) | ||
else: | ||
url = 'redis://{}:{}/0'.format(*addr) | ||
|
||
_app = Celery('datacube_task', broker=url, backend=url) | ||
|
||
_app.conf.update( | ||
task_serializer='pickle', | ||
result_serializer='pickle', | ||
accept_content=['pickle']) | ||
|
||
return _app | ||
|
||
|
||
# Celery worker launch script expects to see app object at the top level | ||
# pylint: disable=invalid-name | ||
app = mk_celery_app() | ||
|
||
|
||
def set_address(host, port=6379, db=0, password=None): | ||
if password is None: | ||
url = 'redis://{}:{}/{}'.format(host, port, db) | ||
else: | ||
url = 'redis://:{}@{}:{}/{}'.format(password, host, port, db) | ||
|
||
app.conf.update(result_backend=url, | ||
broker_url=url) | ||
|
||
|
||
@app.task() | ||
def run_cloud_pickled_function(f_data, *args, **kwargs): | ||
from cloudpickle import loads | ||
func = loads(f_data) | ||
return func(*args, **kwargs) | ||
|
||
|
||
def submit_cloud_pickled_function(f, *args, **kwargs): | ||
from cloudpickle import dumps | ||
f_data = dumps(f) | ||
return run_cloud_pickled_function.delay(f_data, *args, **kwargs) | ||
|
||
|
||
def launch_worker(host, port=6379, password=None, nprocs=None): | ||
if password == '': | ||
password = get_redis_password(generate_if_missing=False) | ||
|
||
set_address(host, port, password=password) | ||
|
||
argv = ['worker', '-A', 'datacube._celery_runner'] | ||
if nprocs is not None: | ||
argv.extend(['-c', str(nprocs)]) | ||
|
||
app.worker_main(argv) | ||
|
||
|
||
def get_redis_password(generate_if_missing=False): | ||
from .utils import write_user_secret_file, slurp, gen_password | ||
|
||
REDIS_PASSWORD_FILE = '.datacube-redis' | ||
|
||
password = slurp(REDIS_PASSWORD_FILE, in_home_dir=True) | ||
if password is not None: | ||
return password | ||
|
||
if generate_if_missing: | ||
password = gen_password(12) | ||
write_user_secret_file(password, REDIS_PASSWORD_FILE, in_home_dir=True) | ||
|
||
return password | ||
|
||
|
||
class CeleryExecutor(object): | ||
def __init__(self, host=None, port=None, password=None): | ||
# print('Celery: {}:{}'.format(host, port)) | ||
self._shutdown = None | ||
|
||
if port or host or password: | ||
if password == '': | ||
password = get_redis_password(generate_if_missing=True) | ||
|
||
set_address(host if host else 'localhost', | ||
port if port else 6379, | ||
password=password) | ||
|
||
host = host if host else 'localhost' | ||
port = port if port else 6379 | ||
|
||
if not check_redis(host, port, password): | ||
if host in ['localhost', '127.0.0.1']: | ||
self._shutdown = launch_redis(port if port else 6379, password=password) | ||
else: | ||
raise IOError("Can't connect to redis server @ {}:{}".format(host, port)) | ||
|
||
def __del__(self): | ||
if self._shutdown: | ||
app.control.shutdown() | ||
sleep(1) | ||
self._shutdown() | ||
|
||
def __repr__(self): | ||
return 'CeleryRunner' | ||
|
||
def submit(self, func, *args, **kwargs): | ||
return submit_cloud_pickled_function(func, *args, **kwargs) | ||
|
||
def map(self, func, iterable): | ||
return [self.submit(func, data) for data in iterable] | ||
|
||
@staticmethod | ||
def get_ready(futures): | ||
completed = [] | ||
failed = [] | ||
pending = [] | ||
for f in futures: | ||
if f.ready(): | ||
if f.failed(): | ||
failed.append(f) | ||
else: | ||
completed.append(f) | ||
else: | ||
pending.append(f) | ||
return completed, failed, pending | ||
|
||
@staticmethod | ||
def as_completed(futures): | ||
while len(futures) > 0: | ||
pending = [] | ||
|
||
for promise in futures: | ||
if promise.ready(): | ||
yield promise | ||
else: | ||
pending.append(promise) | ||
|
||
if len(pending) == len(futures): | ||
# If no change detected sleep for a bit | ||
# TODO: this is sub-optimal, not sure what other options are | ||
# though? | ||
sleep(0.1) | ||
|
||
futures = pending | ||
|
||
@classmethod | ||
def next_completed(cls, futures, default): | ||
results = list(futures) | ||
if not results: | ||
return default, results | ||
result = next(cls.as_completed(results), default) | ||
results.remove(result) | ||
return result, results | ||
|
||
@staticmethod | ||
def results(futures): | ||
return [future.get() for future in futures] | ||
|
||
@staticmethod | ||
def result(future): | ||
return future.get() | ||
|
||
@staticmethod | ||
def release(future): | ||
future.forget() | ||
|
||
|
||
def check_redis(host='localhost', port=6379, password=None): | ||
if password == '': | ||
password = get_redis_password() | ||
|
||
server = redis.Redis(host, port, password=password) | ||
try: | ||
server.ping() | ||
except redis.exceptions.ConnectionError: | ||
return False | ||
except redis.exceptions.ResponseError as error: | ||
print('Redis responded with an error: {}'.format(error)) | ||
return False | ||
return True | ||
|
||
|
||
def launch_redis(port=6379, password=None, **kwargs): | ||
import tempfile | ||
from os import path | ||
import subprocess | ||
import shutil | ||
from .utils import write_user_secret_file | ||
|
||
def stringify(v): | ||
if isinstance(v, str): | ||
return '"'+v+'"' if v.find(' ') >= 0 else v | ||
|
||
if isinstance(v, bool): | ||
return {True: 'yes', False: 'no'}[v] | ||
|
||
return str(v) | ||
|
||
def fix_key(k): | ||
return k.replace('_', '-') | ||
|
||
def write_config(params, cfgfile): | ||
lines = ['{} {}'.format(fix_key(k), stringify(v)) for k, v in params.items()] | ||
cfg_txt = '\n'.join(lines) | ||
write_user_secret_file(cfg_txt, cfgfile) | ||
|
||
workdir = tempfile.mkdtemp(prefix='redis-') | ||
|
||
defaults = dict(maxmemory_policy='noeviction', | ||
daemonize=True, | ||
port=port, | ||
databases=4, | ||
maxmemory="100mb", | ||
hz=50, | ||
loglevel='notice', | ||
pidfile=path.join(workdir, 'redis.pid'), | ||
logfile=path.join(workdir, 'redis.log')) | ||
|
||
if password is not None: | ||
if password == '': | ||
password = get_redis_password(generate_if_missing=True) | ||
|
||
defaults['requirepass'] = password | ||
else: | ||
password = defaults.get('requirepass', None) | ||
|
||
defaults.update(kwargs) | ||
|
||
cfgfile = path.join(workdir, 'redis.cfg') | ||
write_config(defaults, cfgfile) | ||
|
||
def cleanup(): | ||
shutil.rmtree(workdir) | ||
|
||
def shutdown(): | ||
server = redis.Redis('localhost', port, password=password) | ||
server.shutdown() | ||
sleep(1) | ||
cleanup() | ||
|
||
try: | ||
subprocess.check_call(['redis-server', cfgfile]) | ||
except subprocess.CalledProcessError: | ||
cleanup() | ||
return False | ||
|
||
return shutdown |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
import click | ||
|
||
from datacube import config, __version__ | ||
from datacube.executor import get_executor | ||
from datacube.executor import get_executor, mk_celery_executor | ||
from datacube.index import index_connect | ||
from pathlib import Path | ||
|
||
|
@@ -206,9 +206,12 @@ def parse_endpoint(value): | |
EXECUTOR_TYPES = { | ||
'serial': lambda _: get_executor(None, None), | ||
'multiproc': lambda workers: get_executor(None, int(workers)), | ||
'distributed': lambda addr: get_executor(parse_endpoint(addr), True) | ||
'distributed': lambda addr: get_executor(parse_endpoint(addr), True), | ||
'celery': lambda addr: mk_celery_executor(*parse_endpoint(addr)) | ||
} | ||
|
||
EXECUTOR_TYPES['dask'] = EXECUTOR_TYPES['distributed'] # Add alias "dask" for distributed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could make things more confusing, as dask also has synchronous, multi-threaded and multi-process schedulers/executors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, but 'distributed' is too generic also as celery is also "distributed" across machines, and 'dask[.-_]distributed' is hard to type and to remember separator token. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! |
||
|
||
|
||
def _setup_executor(ctx, param, value): | ||
try: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a shame that
celery.ResultSet
doesn't have acollect()
function likeAsyncResult
http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect