Skip to content

Commit

Permalink
Use new-style config mechanism for the scheduler
Browse files Browse the repository at this point in the history
Also removed _create_scheduler
  • Loading branch information
Erik Bernhardsson committed Feb 8, 2015
1 parent 788f4de commit eb2de52
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 59 deletions.
68 changes: 41 additions & 27 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import os
import time

import configuration
import notifications
import parameter
import task_history as history
from task_status import DISABLED, DONE, FAILED, PENDING, RUNNING, SUSPENDED, UNKNOWN
from task import Config

logger = logging.getLogger("luigi.server")

Expand Down Expand Up @@ -62,12 +65,30 @@ class Scheduler(object):
}


# We're passing around this config a lot, so let's put it on an object
SchedulerConfig = collections.namedtuple('SchedulerConfig', [
'retry_delay', 'remove_delay', 'worker_disconnect_delay',
'disable_failures', 'disable_window', 'disable_persist', 'disable_time',
'max_shown_tasks',
])
class scheduler(Config):
# TODO(erikbern): the config_path is needed for backwards compatilibity. We should drop the compatibility
# at some point (in particular this would force users to replace all dashes with underscores in the config)
retry_delay = parameter.FloatParameter(default=900.0,
config_path=dict(section='scheduler', name='retry-delay'))
remove_delay = parameter.FloatParameter(default=600.0,
config_path=dict(section='scheduler', name='remote-delay'))
worker_disconnect_delay = parameter.FloatParameter(default=60.0,
config_path=dict(section='scheduler', name='worker-disconnect-delay'))
state_path = parameter.Parameter(default='/var/lib/luigi-server/state.pickle',
config_path=dict(section='scheduler', name='state-path'))

# Jobs are disabled if we see more than disable_failures failures in disable_window seconds.
# These disables last for disable_persist seconds.
disable_window = parameter.IntParameter(default=3600,
config_path=dict(section='scheduler', name='disable-window-seconds'))
disable_failures = parameter.IntParameter(default=None,
config_path=dict(section='scheduler', name='disable-num-failures'))
disable_persist = parameter.IntParameter(default=86400,
config_path=dict(section='scheduler', name='disable-persist-seconds'))
max_shown_tasks = parameter.IntParameter(default=100000,
config_path=dict(section='scheduler', name='max-shown-task'))

record_task_history = parameter.BoolParameter(default=False)


def fix_time(x):
Expand Down Expand Up @@ -339,7 +360,7 @@ def prune(self, task, config):

# Re-enable task after the disable time expires
if task.status == DISABLED and task.scheduler_disable_time:
if time.time() - fix_time(task.scheduler_disable_time) > config.disable_time:
if time.time() - fix_time(task.scheduler_disable_time) > config.disable_persist:
self.re_enable(task, config)

# Remove tasks that have no stakeholders
Expand Down Expand Up @@ -391,10 +412,7 @@ class CentralPlannerScheduler(Scheduler):
Can be run locally or on a server (using RemoteScheduler + server.Server).
"""

def __init__(self, retry_delay=900.0, remove_delay=600.0, worker_disconnect_delay=60.0,
state_path='/var/lib/luigi-server/state.pickle', task_history=None,
resources=None, disable_persist=0, disable_window=0, disable_failures=None,
max_shown_tasks=100000):
def __init__(self, config=None, resources=None, task_history=None, **kwargs):
"""
(all arguments are in seconds)
Keyword Arguments:
Expand All @@ -403,23 +421,19 @@ def __init__(self, retry_delay=900.0, remove_delay=600.0, worker_disconnect_dela
:param state_path: path to state file (tasks and active workers).
:param worker_disconnect_delay: if a worker hasn't communicated for this long, remove it from active workers.
"""
self._config = SchedulerConfig(
retry_delay=retry_delay,
remove_delay=remove_delay,
worker_disconnect_delay=worker_disconnect_delay,
disable_failures=disable_failures,
disable_window=disable_window,
disable_persist=disable_persist,
disable_time=disable_persist,
max_shown_tasks=max_shown_tasks,
)

self._state = SimpleTaskState(state_path)
self._task_history = task_history or history.NopHistory()
self._resources = resources
self._config = config or scheduler(**kwargs)
self._state = SimpleTaskState(self._config.state_path)
if task_history:
self._task_history = task_history
elif self._config.record_task_history:
import db_task_history # Needs sqlalchemy, thus imported here
self._task_history = db_task_history.DbTaskHistory()
else:
self._task_history = history.NopHistory()
self._resources = resources or configuration.get_config().getintdict('resources') # TODO: Can we make this a Parameter?
self._make_task = functools.partial(
Task, disable_failures=disable_failures,
disable_window=disable_window)
Task, disable_failures=self._config.disable_failures,
disable_window=self._config.disable_window)

def load(self):
self._state.load()
Expand Down
32 changes: 2 additions & 30 deletions luigi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,11 @@
import tornado.netutil
import tornado.web

import configuration
import scheduler
import task_history

logger = logging.getLogger("luigi.server")


def _create_scheduler():
config = configuration.get_config()
retry_delay = config.getfloat('scheduler', 'retry-delay', 900.0)
remove_delay = config.getfloat('scheduler', 'remove-delay', 600.0)
worker_disconnect_delay = config.getfloat('scheduler', 'worker-disconnect-delay', 60.0)
state_path = config.get('scheduler', 'state-path', '/var/lib/luigi-server/state.pickle')

# Jobs are disabled if we see more than disable_failures failures in disable_window seconds.
# These disables last for disable_persist seconds.
disable_window = config.getint('scheduler', 'disable-window-seconds', 3600)
disable_failures = config.getint('scheduler', 'disable-num-failures', None)
disable_persist = config.getint('scheduler', 'disable-persist-seconds', 86400)
max_shown_tasks = config.getint('scheduler', 'max-shown-tasks', 100000)

resources = config.getintdict('resources')
if config.getboolean('scheduler', 'record_task_history', False):
import db_task_history # Needs sqlalchemy, thus imported here
task_history_impl = db_task_history.DbTaskHistory()
else:
task_history_impl = task_history.NopHistory()
return scheduler.CentralPlannerScheduler(
retry_delay, remove_delay, worker_disconnect_delay, state_path, task_history_impl,
resources, disable_persist, disable_window, disable_failures, max_shown_tasks,
)


class RPCHandler(tornado.web.RequestHandler):
"""
Handle remote scheduling calls using rpc.RemoteSchedulerResponder.
Expand Down Expand Up @@ -180,7 +152,7 @@ def run(api_port=8082, address=None, scheduler=None, responder=None):
"""
Runs one instance of the API server.
"""
sched = scheduler or _create_scheduler()
sched = scheduler or scheduler.CentralPlannerScheduler()
# load scheduler state
sched.load()

Expand Down Expand Up @@ -216,7 +188,7 @@ def run_api_threaded(api_port=8082, address=None):
:param address:
:return:
"""
sock_names = _init_api(_create_scheduler(), None, api_port, address)
sock_names = _init_api(scheduler.CentralPlannerScheduler(), None, api_port, address)

import threading

Expand Down
18 changes: 18 additions & 0 deletions test/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import unittest

import luigi.scheduler
from helpers import with_config

luigi.notifications.DEBUG = True

Expand Down Expand Up @@ -54,6 +55,23 @@ def test_load_broken_state(self):

self.assertEqual(list(state.get_worker_ids()), [])

@with_config({'scheduler': {'disable-num-failures': '44', 'worker-disconnect-delay': '55'}})
def test_scheduler_with_config(self):
cps = luigi.scheduler.CentralPlannerScheduler()
self.assertEqual(44, cps._config.disable_failures)
self.assertEqual(55, cps._config.worker_disconnect_delay)

# Override
cps = luigi.scheduler.CentralPlannerScheduler(disable_failures=66,
worker_disconnect_delay=77)
self.assertEqual(66, cps._config.disable_failures)
self.assertEqual(77, cps._config.worker_disconnect_delay)

@with_config({'resources': {'a': '100', 'b': '200'}})
def test_scheduler_with_resources(self):
cps = luigi.scheduler.CentralPlannerScheduler()
self.assertEqual({'a': 100, 'b': 200}, cps._resources)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions test/scheduler_visualisation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import luigi
import luigi.notifications
import luigi.server
import luigi.scheduler
import luigi.worker

luigi.notifications.DEBUG = True
Expand Down Expand Up @@ -90,7 +90,7 @@ def assertLessEqual(self, a, b):
self.assertTrue(a <= b)

def setUp(self):
self.scheduler = luigi.server._create_scheduler()
self.scheduler = luigi.scheduler.CentralPlannerScheduler()

def tearDown(self):
pass
Expand Down
1 change: 1 addition & 0 deletions test/server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import urllib2

import luigi.server
from helpers import with_config


class ServerTestBase(unittest.TestCase):
Expand Down

0 comments on commit eb2de52

Please sign in to comment.