Skip to content

Commit

Permalink
Removed the RemoteSchedulerResponder class
Browse files Browse the repository at this point in the history
  • Loading branch information
Erik Bernhardsson committed Jan 26, 2015
1 parent b44ffaf commit 661dc62
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 97 deletions.
58 changes: 0 additions & 58 deletions luigi/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,61 +141,3 @@ def fetch_error(self, task_id):

def add_worker(self, worker, info):
return self._request('/api/add_worker', {'worker': worker, 'info': info})


class RemoteSchedulerResponder(object):

""" Use on the server side for responding to requests
The kwargs are there for forwards compatibility in case workers add
new (optional) arguments. That way there's no dependency on the server
component when upgrading Luigi on the worker side.
TODO(erikbern): what is this class actually used for? Other than an
unnecessary layer of indirection around central scheduler
"""

def __init__(self, scheduler):
self._scheduler = scheduler

def add_task(self, worker, task_id, status, runnable, deps, new_deps, expl,
resources=None, priority=0, family='', params={}, **kwargs):
return self._scheduler.add_task(
worker, task_id, status, runnable, deps, new_deps, expl,
resources, priority, family, params)

def add_worker(self, worker, info, **kwargs):
return self._scheduler.add_worker(worker, info)

def get_work(self, worker, host=None, **kwargs):
return self._scheduler.get_work(worker, host)

def ping(self, worker, **kwargs):
return self._scheduler.ping(worker)

def graph(self, **kwargs):
return self._scheduler.graph()

index = graph

def dep_graph(self, task_id, **kwargs):
return self._scheduler.dep_graph(task_id)

def inverse_dep_graph(self, task_id, **kwargs):
return self._scheduler.inverse_dependencies(task_id)

def task_list(self, status, upstream_status, **kwargs):
return self._scheduler.task_list(status, upstream_status)

def worker_list(self, **kwargs):
return self._scheduler.worker_list()

def task_search(self, task_str, **kwargs):
return self._scheduler.task_search(task_str)

def fetch_error(self, task_id, **kwargs):
return self._scheduler.fetch_error(task_id)

@property
def task_history(self):
return self._scheduler.task_history
22 changes: 11 additions & 11 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def _update_priority(self, task, prio, worker):

def add_task(self, worker, task_id, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', params={}):
priority=0, family='', params={}, **kwargs):
"""
* Add task identified by task_id if it doesn't exist
* If deps is not None, update dependency list
Expand Down Expand Up @@ -504,7 +504,7 @@ def add_task(self, worker, task_id, status=PENDING, runnable=True,
if expl is not None:
task.expl = expl

def add_worker(self, worker, info):
def add_worker(self, worker, info, **kwargs):
self._state.get_worker(worker).add_info(info)

def update_resources(self, **resources):
Expand Down Expand Up @@ -556,7 +556,7 @@ def _schedulable(self, task):
return False
return True

def get_work(self, worker, host=None):
def get_work(self, worker, host=None, **kwargs):
# TODO: remove any expired nodes

# Algo: iterate over all nodes, find the highest priority node no dependencies and available
Expand Down Expand Up @@ -633,7 +633,7 @@ def get_work(self, worker, host=None):
'task_id': best_task_id,
'running_tasks': running_tasks}

def ping(self, worker):
def ping(self, worker, **kwargs):
self.update(worker)

def _upstream_status(self, task_id, upstream_status_table):
Expand Down Expand Up @@ -680,7 +680,7 @@ def _serialize_task(self, task_id, include_deps=True):
ret['deps'] = list(task.deps)
return ret

def graph(self):
def graph(self, **kwargs):
self.prune()
serialized = {}
for task in self._state.get_active_tasks():
Expand Down Expand Up @@ -713,14 +713,14 @@ def _recurse_deps(self, task_id, serialized):
for dep in task.deps:
self._recurse_deps(dep, serialized)

def dep_graph(self, task_id):
def dep_graph(self, task_id, **kwargs):
self.prune()
serialized = {}
if self._state.has_task(task_id):
self._recurse_deps(task_id, serialized)
return serialized

def task_list(self, status, upstream_status, limit=True):
def task_list(self, status, upstream_status, limit=True, **kwargs):
''' query for a subset of tasks by status '''
self.prune()
result = {}
Expand All @@ -734,7 +734,7 @@ def task_list(self, status, upstream_status, limit=True):
return {'num_tasks': len(result)}
return result

def worker_list(self, include_running=True):
def worker_list(self, include_running=True, **kwargs):
self.prune()
workers = [
dict(
Expand Down Expand Up @@ -764,7 +764,7 @@ def worker_list(self, include_running=True):
worker['running'] = tasks
return workers

def inverse_dependencies(self, task_id):
def inverse_dependencies(self, task_id, **kwargs):
self.prune()
serialized = {}
if self._state.has_task(task_id):
Expand All @@ -784,7 +784,7 @@ def _traverse_inverse_deps(self, task_id, serialized):
serialized[task.id]["deps"] = []
stack.append(task.id)

def task_search(self, task_str):
def task_search(self, task_str, **kwargs):
''' query for a subset of tasks by task_id '''
self.prune()
result = collections.defaultdict(dict)
Expand All @@ -802,7 +802,7 @@ def re_enable_task(self, task_id):
serialized = self._serialize_task(task_id)
return serialized

def fetch_error(self, task_id):
def fetch_error(self, task_id, **kwargs):
if self._state.has_task(task_id):
return {"taskId": task_id, "error": self._state.get_task(task_id).expl}
else:
Expand Down
52 changes: 24 additions & 28 deletions luigi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import scheduler
import pkg_resources
import signal
from rpc import RemoteSchedulerResponder
import task_history
import logging
logger = logging.getLogger("luigi.server")
Expand Down Expand Up @@ -60,18 +59,20 @@ def _create_scheduler():


class RPCHandler(tornado.web.RequestHandler):

""" Handle remote scheduling calls using rpc.RemoteSchedulerResponder"""

def initialize(self, api):
self._api = api
def initialize(self, scheduler):
self._scheduler = scheduler

def get(self, method):
payload = self.get_argument('data', default="{}")
arguments = json.loads(payload)

if hasattr(self._api, method):
result = getattr(self._api, method)(**arguments)
# TODO: we should probably denote all methods on the scheduler that are "API-level"
# versus internal methods. Right now you can do a REST method call to any method
# defined on the scheduler, which is pretty bad from a security point of view.

if hasattr(self._scheduler, method):
result = getattr(self._scheduler, method)(**arguments)
self.write({"response": result}) # wrap all json response in a dictionary
else:
self.send_error(404)
Expand All @@ -80,9 +81,8 @@ def get(self, method):


class BaseTaskHistoryHandler(tornado.web.RequestHandler):

def initialize(self, api):
self._api = api
def initialize(self, scheduler):
self._scheduler = scheduler

def get_template_path(self):
return pkg_resources.resource_filename(__name__, 'templates')
Expand All @@ -91,35 +91,31 @@ def get_template_path(self):
class RecentRunHandler(BaseTaskHistoryHandler):

def get(self):
tasks = self._api.task_history.find_latest_runs()
tasks = self._scheduler.task_history.find_latest_runs()
self.render("recent.html", tasks=tasks)


class ByNameHandler(BaseTaskHistoryHandler):

def get(self, name):
tasks = self._api.task_history.find_all_by_name(name)
tasks = self._scheduler.task_history.find_all_by_name(name)
self.render("recent.html", tasks=tasks)


class ByIdHandler(BaseTaskHistoryHandler):

def get(self, id):
task = self._api.task_history.find_task_by_id(id)
task = self._scheduler.task_history.find_task_by_id(id)
self.render("show.html", task=task)


class ByParamsHandler(BaseTaskHistoryHandler):

def get(self, name):
payload = self.get_argument('data', default="{}")
arguments = json.loads(payload)
tasks = self._api.task_history.find_all_by_parameters(name, session=None, **arguments)
tasks = self._scheduler.task_history.find_all_by_parameters(name, session=None, **arguments)
self.render("recent.html", tasks=tasks)


class StaticFileHandler(tornado.web.RequestHandler):

def get(self, path):
# Path checking taken from Flask's safe_join function:
# https://github.com/mitsuhiko/flask/blob/1d55b8983/flask/helpers.py#L563-L587
Expand All @@ -135,28 +131,28 @@ def get(self, path):


class RootPathHandler(tornado.web.RequestHandler):

def get(self):
self.redirect("/static/visualiser/index.html")


def app(api):
def app(scheduler):
handlers = [
(r'/api/(.*)', RPCHandler, {"api": api}),
(r'/api/(.*)', RPCHandler, {"scheduler": scheduler}),
(r'/static/(.*)', StaticFileHandler),
(r'/', RootPathHandler),
(r'/history', RecentRunHandler, {'api': api}),
(r'/history/by_name/(.*?)', ByNameHandler, {'api': api}),
(r'/history/by_id/(.*?)', ByIdHandler, {'api': api}),
(r'/history/by_params/(.*?)', ByParamsHandler, {'api': api})
(r'/history', RecentRunHandler, {'scheduler': scheduler}),
(r'/history/by_name/(.*?)', ByNameHandler, {'scheduler': scheduler}),
(r'/history/by_id/(.*?)', ByIdHandler, {'scheduler': scheduler}),
(r'/history/by_params/(.*?)', ByParamsHandler, {'scheduler': scheduler})
]
api_app = tornado.web.Application(handlers)
return api_app


def _init_api(sched, responder, api_port, address):
api = responder or RemoteSchedulerResponder(sched)
api_app = app(api)
def _init_api(sched, responder=None, api_port=None, address=None):
if responder:
raise Exception('The "responder" argument is no longer supported')
api_app = app(sched)
api_sockets = tornado.netutil.bind_sockets(api_port, address=address)
server = tornado.httpserver.HTTPServer(api_app)
server.add_sockets(api_sockets)
Expand Down

0 comments on commit 661dc62

Please sign in to comment.