Skip to content

Commit

Permalink
Add the runner param.
Browse files Browse the repository at this point in the history
One of the recent request was to move the runner out, so you could write:
`create(..., runner=Sync(), tasks=[list of tasks])`

Still unsure if we should move forward with this implementation. The runner object will
always have a strong dependency on the tasks it runs. Another alternative would be to
have a `run` method instead that takes a runner and a list of tasks.
  • Loading branch information
xethorn committed Feb 17, 2015
1 parent 1661cf8 commit 2e33570
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 51 deletions.
13 changes: 7 additions & 6 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ def execute_activity(self, context):
context (dict): The flow context.
"""

return self.tasks.execute(self, context)
return self.runner.execute(self, self.tasks, context)

def hydrate(self, data):
def hydrate(self, **data):
"""Hydrate the task with information provided.
Args:
Expand All @@ -170,6 +170,7 @@ def hydrate(self, data):
self.retry = getattr(self, 'retry', None) or data.get('retry', 0)
self.task_list = self.task_list or data.get('task_list')
self.tasks = getattr(self, 'tasks', []) or data.get('tasks')
self.runner = getattr(self, 'runner', None) or data.get('runner')
self.generators = getattr(
self, 'generators', None) or data.get('generators')

Expand Down Expand Up @@ -221,7 +222,7 @@ def timeout(self):
int: Task list timeout.
"""

return self.tasks.timeout
return self.runner.estimate_timeout(self.tasks)


class ActivityWorker():
Expand Down Expand Up @@ -282,15 +283,15 @@ def create(domain):

def wrapper(**options):
activity = Activity()
activity.hydrate(dict(
activity.hydrate(
domain=domain,
name=options.get('name'),
generators=options.get('generators', []),
requires=options.get('requires', []),
retry=options.get('retry'),
task_list=domain + '_' + options.get('name'),
tasks=options.get('tasks', [])
))
runner=options.get('runner'),
tasks=options.get('tasks', []))
return activity
return wrapper

Expand Down
33 changes: 16 additions & 17 deletions garcon/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,24 @@

class BaseRunner():

def __init__(self, *args):
self.tasks = args

@property
def timeout(self):
def estimate_timeout(self, tasks):
"""Calculate and return the timeout for an activity.
The calculation of the timeout is pessimistic: it takes the worse case
scenario (even for asynchronous task lists, it supposes there is only
one thread completed at a time.)
Args:
tasks (list): the list of tasks the runner needs to execute.
Return:
str: The timeout (boto requires the timeout to be a string and not
a regular number.)
"""

timeout = 0

for task in self.tasks:
for task in tasks:
task_timeout = DEFAULT_TASK_TIMEOUT
task_details = getattr(task, '__garcon__', None)

Expand All @@ -45,7 +44,7 @@ def timeout(self):

return str(timeout)

def execute(self, activity, context):
def execute(self, activity, tasks, context):
"""Execution of the tasks.
"""

Expand All @@ -54,9 +53,9 @@ def execute(self, activity, context):

class Sync(BaseRunner):

def execute(self, activity, context):
def execute(self, activity, tasks, context):
result = dict()
for task in self.tasks:
for task in tasks:
task_context = dict(list(result.items()) + list(context.items()))
resp = task(task_context, activity=activity)
result.update(resp or dict())
Expand All @@ -65,18 +64,18 @@ def execute(self, activity, context):

class Async(BaseRunner):

def __init__(self, *args, **kwargs):
self.tasks = args
self.max_workers = kwargs.get('max_workers', 3)
def __init__(self, max_workers=3):
self.max_workers = max_workers

def execute(self, activity, context):
def execute(self, activity, tasks, context):
result = dict()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
tasks = []
for task in self.tasks:
tasks.append(executor.submit(task, context, activity=activity))
task_threads = []
for task in tasks:
task_threads.append(
executor.submit(task, context, activity=activity))

for future in futures.as_completed(tasks):
for future in futures.as_completed(task_threads):
data = future.result()
result.update(data or {})
return result
20 changes: 12 additions & 8 deletions tests/fixtures/flows/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,33 @@

activity_1 = create(
name='activity_1',
tasks=runner.Sync(
runner=runner.Sync(),
tasks=[
lambda activity, context:
print('activity_1')))
print('activity_1')])

activity_2 = create(
name='activity_2',
requires=[activity_1],
tasks=runner.Async(
runner=runner.Async(),
tasks=[
lambda activity, context:
print('activity_2_task_1'),
lambda activity, context:
print('activity_2_task_2')))
print('activity_2_task_2')])

activity_3 = create(
name='activity_3',
requires=[activity_1],
tasks=runner.Sync(
runner=runner.Sync(),
tasks=[
lambda activity, context:
print('activity_3')))
print('activity_3')])

activity_4 = create(
name='activity_4',
requires=[activity_3, activity_2],
tasks=runner.Sync(
runner=runner.Sync(),
tasks=[
lambda activity, context:
print('activity_4')))
print('activity_4')])
7 changes: 4 additions & 3 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def test_execute_activity(monkeypatch):
custom_task = MagicMock(return_value=resp)

current_activity = activity.Activity()
current_activity.tasks = runner.Sync(custom_task)
current_activity.runner = runner.Sync()
current_activity.tasks = [custom_task]

val = current_activity.execute_activity(dict(foo='bar'))

Expand All @@ -166,11 +167,11 @@ def test_hydrate_activity(monkeypatch):

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
current_activity = activity.Activity()
current_activity.hydrate(dict(
current_activity.hydrate(
name='activity',
domain='domain',
requires=[],
tasks=[lambda: dict('val')]))
tasks=[lambda: dict('val')])


def test_create_activity_worker(monkeypatch):
Expand Down
36 changes: 19 additions & 17 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ def test_synchronous_tasks():
"""

resp = dict(foo='bar')
current_runner = runner.Sync(
MagicMock(), MagicMock(return_value=resp))
result = current_runner.execute(None, dict())
tasks = [MagicMock(return_value=resp),]
current_runner = runner.Sync()
result = current_runner.execute(None, tasks, dict())

assert len(current_runner.tasks) == 2

for current_task in current_runner.tasks:
for current_task in tasks:
assert current_task.called

assert resp == result
Expand All @@ -39,13 +37,12 @@ def test_aynchronous_tasks():
list(tasks[4].return_value.items()))

workers = 2
current_runner = runner.Async(*tasks, max_workers=workers)
current_runner = runner.Async(max_workers=workers)

assert current_runner.max_workers == workers
assert len(current_runner.tasks) == len(tasks)

context = dict(hello='world')
resp = current_runner.execute(None, context)
resp = current_runner.execute(None, tasks, context)

for current_task in tasks:
assert current_task.called
Expand All @@ -57,16 +54,19 @@ def test_calculate_timeout_with_no_tasks():
"""Task list without task has no timeout.
"""

task_list = runner.BaseRunner()
assert task_list.timeout == '0'
tasks = []
current_runner = runner.BaseRunner()
assert current_runner.estimate_timeout(tasks) == '0'


def test_calculate_default_timeout():
"""Tasks that do not have a set timeout get the default timeout.
"""

task_list = runner.BaseRunner(lambda x: x)
assert task_list.timeout == str(runner.DEFAULT_TASK_TIMEOUT)
current_runner = runner.BaseRunner()
tasks = [lambda x: x]
estimate_timeout = current_runner.estimate_timeout(tasks)
assert estimate_timeout == str(runner.DEFAULT_TASK_TIMEOUT)


def test_calculate_timeout():
Expand All @@ -79,11 +79,13 @@ def test_calculate_timeout():
def task_a():
pass

current_runner = runner.BaseRunner(task_a)
assert current_runner.timeout == str(timeout)
current_runner = runner.BaseRunner()
assert current_runner.estimate_timeout([task_a]) == str(timeout)

def task_b():
pass

current_runner = runner.BaseRunner(task_a, task_b)
assert current_runner.timeout == str(timeout + runner.DEFAULT_TASK_TIMEOUT)
current_runner = runner.BaseRunner()
tasks = [task_a, task_b]
estimate_timeout = current_runner.estimate_timeout(tasks)
assert estimate_timeout == str(timeout + runner.DEFAULT_TASK_TIMEOUT)

0 comments on commit 2e33570

Please sign in to comment.