Skip to content

Commit

Permalink
Merge 7b41e17 into cdff103
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill888 committed May 16, 2017
2 parents cdff103 + 7b41e17 commit 567e642
Show file tree
Hide file tree
Showing 14 changed files with 609 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .travis/environment_py27.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies:
- gdal
- dask
- xarray
- redis-py # redis client lib, used by celery
- redis # redis server
- pylint # testing
- pep8 # testing
- fiona # movie generator app
Expand All @@ -28,6 +30,7 @@ dependencies:
- sshtunnel # for simple-replicas
- tqdm # for simple-replicas
- pip:
- celery >= 4
- pypeg2
- pytest-cov # testing
- pytest-logging
Expand Down
3 changes: 3 additions & 0 deletions .travis/environment_py35.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dependencies:
- gdal
- dask
- xarray
- redis-py # redis client lib, used by celery
- redis # redis server
- pylint # testing
- pep8 # testing
- fiona # movie generator app
Expand All @@ -29,6 +31,7 @@ dependencies:
- sshtunnel # for simple-replicas
- tqdm # for simple-replicas
- pip:
- celery >= 4
- pypeg2
- pytest-cov # testing
- pytest-logging
Expand Down
253 changes: 253 additions & 0 deletions datacube/_celery_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
from __future__ import print_function
from __future__ import absolute_import

from celery import Celery
from time import sleep
import redis
import os

# This can be changed via environment variable `REDIS`
REDIS_URL = 'redis://localhost:6379/0'


def mk_celery_app(addr=None):

if addr is None:
url = os.environ.get('REDIS', REDIS_URL)
else:
url = 'redis://{}:{}/0'.format(*addr)

_app = Celery('datacube_task', broker=url, backend=url)

_app.conf.update(
task_serializer='pickle',
result_serializer='pickle',
accept_content=['pickle'])

return _app


# Celery worker launch script expects to see app object at the top level
# pylint: disable=invalid-name
app = mk_celery_app()


def set_address(host, port=6379, db=0, password=None):
if password is None:
url = 'redis://{}:{}/{}'.format(host, port, db)
else:
url = 'redis://:{}@{}:{}/{}'.format(password, host, port, db)

app.conf.update(result_backend=url,
broker_url=url)


@app.task()
def run_cloud_pickled_function(f_data, *args, **kwargs):
from cloudpickle import loads
func = loads(f_data)
return func(*args, **kwargs)


def submit_cloud_pickled_function(f, *args, **kwargs):
from cloudpickle import dumps
f_data = dumps(f)
return run_cloud_pickled_function.delay(f_data, *args, **kwargs)


def launch_worker(host, port=6379, password=None, nprocs=None):
if password == '':
password = get_redis_password(generate_if_missing=False)

set_address(host, port, password=password)

argv = ['worker', '-A', 'datacube._celery_runner']
if nprocs is not None:
argv.extend(['-c', str(nprocs)])

app.worker_main(argv)


def get_redis_password(generate_if_missing=False):
from .utils import write_user_secret_file, slurp, gen_password

REDIS_PASSWORD_FILE = '.datacube-redis'

password = slurp(REDIS_PASSWORD_FILE, in_home_dir=True)
if password is not None:
return password

if generate_if_missing:
password = gen_password(12)
write_user_secret_file(password, REDIS_PASSWORD_FILE, in_home_dir=True)

return password


class CeleryExecutor(object):
def __init__(self, host=None, port=None, password=None):
# print('Celery: {}:{}'.format(host, port))
self._shutdown = None

if port or host or password:
if password == '':
password = get_redis_password(generate_if_missing=True)

set_address(host if host else 'localhost',
port if port else 6379,
password=password)

host = host if host else 'localhost'
port = port if port else 6379

if not check_redis(host, port, password):
if host in ['localhost', '127.0.0.1']:
self._shutdown = launch_redis(port if port else 6379, password=password)
else:
raise IOError("Can't connect to redis server @ {}:{}".format(host, port))

def __del__(self):
if self._shutdown:
app.control.shutdown()
sleep(1)
self._shutdown()

def __repr__(self):
return 'CeleryRunner'

def submit(self, func, *args, **kwargs):
return submit_cloud_pickled_function(func, *args, **kwargs)

def map(self, func, iterable):
return [self.submit(func, data) for data in iterable]

@staticmethod
def get_ready(futures):
completed = []
failed = []
pending = []
for f in futures:
if f.ready():
if f.failed():
failed.append(f)
else:
completed.append(f)
else:
pending.append(f)
return completed, failed, pending

@staticmethod
def as_completed(futures):
while len(futures) > 0:
pending = []

for promise in futures:
if promise.ready():
yield promise
else:
pending.append(promise)

if len(pending) == len(futures):
# If no change detected sleep for a bit
# TODO: this is sub-optimal, not sure what other options are
# though?
sleep(0.1)

futures = pending

@classmethod
def next_completed(cls, futures, default):
results = list(futures)
if not results:
return default, results
result = next(cls.as_completed(results), default)
results.remove(result)
return result, results

@staticmethod
def results(futures):
return [future.get() for future in futures]

@staticmethod
def result(future):
return future.get()

@staticmethod
def release(future):
future.forget()


def check_redis(host='localhost', port=6379, password=None):
server = redis.Redis(host, port, password=password)
try:
server.ping()
except redis.exceptions.ConnectionError:
return False
except redis.exceptions.ResponseError as error:
print('Redis responded with an error: {}'.format(error))
return False
return True


def launch_redis(port=6379, password=None, **kwargs):
import tempfile
from os import path
import subprocess
import shutil
from .utils import write_user_secret_file

def stringify(v):
if isinstance(v, str):
return '"'+v+'"' if v.find(' ') >= 0 else v

if isinstance(v, bool):
return {True: 'yes', False: 'no'}[v]

return str(v)

def fix_key(k):
return k.replace('_', '-')

def write_config(params, cfgfile):
lines = ['{} {}'.format(fix_key(k), stringify(v)) for k, v in params.items()]
cfg_txt = '\n'.join(lines)
write_user_secret_file(cfg_txt, cfgfile)

workdir = tempfile.mkdtemp(prefix='redis-')

defaults = dict(maxmemory_policy='noeviction',
daemonize=True,
port=port,
databases=4,
maxmemory="100mb",
hz=50,
loglevel='notice',
pidfile=path.join(workdir, 'redis.pid'),
logfile=path.join(workdir, 'redis.log'))

if password is not None:
defaults['requirepass'] = password
else:
password = defaults.get('requirepass', None)

defaults.update(kwargs)

cfgfile = path.join(workdir, 'redis.cfg')
write_config(defaults, cfgfile)

def cleanup():
shutil.rmtree(workdir)

def shutdown():
server = redis.Redis('localhost', port, password=password)
server.shutdown()
sleep(1)
cleanup()

try:
subprocess.check_call(['redis-server', cfgfile])
except subprocess.CalledProcessError:
cleanup()
return False

return shutdown
19 changes: 19 additions & 0 deletions datacube/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@


class SerialExecutor(object):
def __repr__(self):
return 'SerialExecutor'

@staticmethod
def submit(func, *args, **kwargs):
return func, args, kwargs
Expand Down Expand Up @@ -153,6 +156,10 @@ class MultiprocessingExecutor(object):
def __init__(self, pool):
self._pool = pool

def __repr__(self):
max_workers = self._pool.__dict__.get('_max_workers', '??')
return 'Multiprocessing ({})'.format(max_workers)

def submit(self, func, *args, **kwargs):
return self._pool.submit(func, *args, **kwargs)

Expand Down Expand Up @@ -222,3 +229,15 @@ def get_executor(scheduler, workers):
return concurrent_exec

return SerialExecutor()


def mk_celery_executor(host, port, password=''):
"""
:param host: Address of the redis database server
:param port: Port of the redis database server
:password: Authentication for redis or None or ''
'' -- load from home folder, or generate if missing,
None -- no authentication
"""
from ._celery_runner import CeleryExecutor
return CeleryExecutor(host, port, password=password)
4 changes: 2 additions & 2 deletions datacube/scripts/user.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import absolute_import

import os
import base64
import logging
import click

from datacube.utils import gen_password
from datacube.config import LocalConfig
from datacube.index._api import Index
from datacube.ui import click as ui
Expand Down Expand Up @@ -55,7 +55,7 @@ def create_user(config, index, role, user, description):
"""
Create a User
"""
password = base64.urlsafe_b64encode(os.urandom(12)).decode('utf-8')
password = gen_password(12)
index.users.create_user(user, password, role, description=description)

click.echo('{host}:{port}:*:{username}:{password}'.format(
Expand Down
7 changes: 5 additions & 2 deletions datacube/ui/click.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import click

from datacube import config, __version__
from datacube.executor import get_executor
from datacube.executor import get_executor, mk_celery_executor
from datacube.index import index_connect
from pathlib import Path

Expand Down Expand Up @@ -206,9 +206,12 @@ def parse_endpoint(value):
EXECUTOR_TYPES = {
'serial': lambda _: get_executor(None, None),
'multiproc': lambda workers: get_executor(None, int(workers)),
'distributed': lambda addr: get_executor(parse_endpoint(addr), True)
'distributed': lambda addr: get_executor(parse_endpoint(addr), True),
'celery': lambda addr: mk_celery_executor(*parse_endpoint(addr))
}

EXECUTOR_TYPES['dask'] = EXECUTOR_TYPES['distributed'] # Add alias "dask" for distributed


def _setup_executor(ctx, param, value):
try:
Expand Down
8 changes: 8 additions & 0 deletions datacube/ui/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ def add_dataset_to_db(index, datasets):
def do_nothing(result):
pass

def _wrap_impl(f, args, kwargs, task):
'needs to be at the top level'
return f(task, *args, **kwargs)

def wrap_task(f, *args, **kwargs):
'turn function `f(task, *args, **kwargs)` into `g(task)` in pickle-able fashion'
return functools.partial(_wrap_impl, f, args, kwargs)


def run_tasks(tasks, executor, run_task, process_result=None, queue_size=50):
"""
Expand Down

0 comments on commit 567e642

Please sign in to comment.