Skip to content

Commit

Permalink
Bugfixes for failed tasks & consistent pop behavior.
Browse files Browse the repository at this point in the history
  • Loading branch information
toastdriven committed Sep 7, 2020
1 parent 51cf8ee commit 9d91309
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 34 deletions.
13 changes: 9 additions & 4 deletions alligator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
self.on_error = on_error
self.depends_on = depends_on
self.delay_until = delay_until
self.result = None

if self.delay_until is not None:
if isinstance(
Expand Down Expand Up @@ -267,13 +268,17 @@ def run(self):
hook function is called, passing both the task & the exception to it.
Then the exception is re-raised.
Finally, the result is returned.
Finally, it returns itself, with `Task.result` set to the result
from the target function's execution.
Returns:
Task: The processed Task.
"""
if self.on_start:
self.on_start(self)

try:
result = self.func(*self.func_args, **self.func_kwargs)
self.result = self.func(*self.func_args, **self.func_kwargs)
except Exception as err:
self.to_failed()

Expand All @@ -285,6 +290,6 @@ def run(self):
self.to_success()

if self.on_success:
self.on_success(self, result)
self.on_success(self, self.result)

return result
return self
68 changes: 56 additions & 12 deletions alligator/workers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os
import signal
import time
import traceback

from alligator.constants import ALL

Expand Down Expand Up @@ -36,6 +38,20 @@ def __init__(self, gator, max_tasks=0, to_consume=ALL, nap_time=0.1):
self.nap_time = nap_time
self.tasks_complete = 0
self.keep_running = False
self.log = self.get_log()

def get_log(self):
"""
Sets up logging for the instance.
Returns:
logging.Logger: The log instance.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
return logging.getLogger(__name__)

def ident(self):
"""
Expand All @@ -51,20 +67,24 @@ def starting(self):
"""
self.keep_running = True
ident = self.ident()
print('{} starting & consuming "{}".'.format(ident, self.to_consume))
self.log.info(
'{} starting & consuming "{}".'.format(ident, self.to_consume)
)

if self.max_tasks:
print("{} will die after {} tasks.".format(ident, self.max_tasks))
self.log.info(
"{} will die after {} tasks.".format(ident, self.max_tasks)
)
else:
print("{} will never die.".format(ident))
self.log.info("{} will never die.".format(ident))

def interrupt(self):
"""
Prints an interrupt message to stdout.
"""
self.keep_running = False
ident = self.ident()
print(
self.log.info(
'{} for "{}" saw interrupt. Finishing in-progress task.'.format(
ident, self.to_consume
)
Expand All @@ -76,7 +96,7 @@ def stopping(self):
"""
self.keep_running = False
ident = self.ident()
print(
self.log.info(
'{} for "{}" shutting down. Consumed {} tasks.'.format(
ident, self.to_consume, self.tasks_complete
)
Expand All @@ -88,7 +108,36 @@ def result(self, result):
:param result: The result of the task
"""
print(result)
if result is not None:
self.log.info(result)

def check_and_run_task(self):
"""
Handles the logic of checking for & executing a task.
`Worker.run_forever` uses this in a loop to actually handle the main
logic, though you can call this on your own if you have different
needs.
Returns:
bool: `True` if a task was run successfully, `False` if there was
no task to process or executing the task failed.
"""
if self.gator.len():
try:
task = self.gator.pop()
except Exception as err:
self.log.exception(err)
return False

if task is None:
return False

self.tasks_complete += 1
self.result(task.result)
return True

return False

def run_forever(self):
"""
Expand All @@ -107,12 +156,7 @@ def handle(signum, frame):
self.stopping()
break

if self.gator.len():
result = self.gator.pop()

if result:
self.tasks_complete += 1
self.result(result)
self.check_and_run_task()

if self.nap_time >= 0:
time.sleep(self.nap_time)
Expand Down
2 changes: 1 addition & 1 deletion tests/run_all.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# export ALLIGATOR_SLOW=true
export ALLIGATOR_SLOW=true

echo 'Locmem & SQLite Tests'
export ALLIGATOR_CONN='locmem://'
Expand Down
10 changes: 5 additions & 5 deletions tests/test_custom_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ def test_everything(self):
self.assertEqual(self.gator.backend.len(ALL), 4)

task_1 = self.gator.pop()
self.assertEqual(task_1, 4)
self.assertEqual(task_1.result, 4)

task_3 = self.gator.get(t3.task_id)
self.assertEqual(task_3, 16)
self.assertEqual(task_3.result, 16)

task_2 = self.gator.pop()
self.assertEqual(task_2, 12)
self.assertEqual(task_2.result, 12)

self.assertEqual(self.gator.backend.len(ALL), 1)

Expand Down Expand Up @@ -70,8 +70,8 @@ def test_delay_until(self, mock_time):
self.assertEqual(self.gator.backend.len(ALL), 4)

task_1 = self.gator.pop()
self.assertEqual(task_1, 4)
self.assertEqual(task_1.result, 4)

mock_time.return_value = 123499999
task_2 = self.gator.pop()
self.assertEqual(task_2, 11)
self.assertEqual(task_2.result, 11)
20 changes: 10 additions & 10 deletions tests/test_gator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def test_pop(self):
self.gator.push(task, so_computationally_expensive, 1, 1)
self.assertEqual(self.gator.backend.len(ALL), 1)

res = self.gator.pop()
self.assertEqual(res, 2)
complete = self.gator.pop()
self.assertEqual(complete.result, 2)

def test_get(self):
self.assertEqual(self.gator.backend.len(ALL), 0)
Expand All @@ -115,11 +115,11 @@ def test_get(self):
self.gator.push(task_2, so_computationally_expensive, 3, 5)
self.assertEqual(self.gator.backend.len(ALL), 2)

res = self.gator.get(task_2.task_id)
self.assertEqual(res, 8)
complete = self.gator.get(task_2.task_id)
self.assertEqual(complete.result, 8)

res = self.gator.get(task_1.task_id)
self.assertEqual(res, 2)
complete = self.gator.get(task_1.task_id)
self.assertEqual(complete.result, 2)

def test_cancel(self):
self.assertEqual(self.gator.backend.len(ALL), 0)
Expand All @@ -138,8 +138,8 @@ def test_execute_success(self):
task = Task(retries=3, is_async=True)
task.to_call(so_computationally_expensive, 2, 7)

res = self.gator.execute(task)
self.assertEqual(res, 9)
complete = self.gator.execute(task)
self.assertEqual(complete.result, 9)
self.assertEqual(task.retries, 3)
self.assertEqual(task.status, SUCCESS)

Expand Down Expand Up @@ -173,8 +173,8 @@ def test_execute_retries(self):
except IOError:
pass

res = self.gator.execute(task)
self.assertEqual(res, 9)
complete = self.gator.execute(task)
self.assertEqual(complete.result, 9)
self.assertEqual(task.retries, 1)

def test_task(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def success_task(initial, incr_by=1):

# Should succeed.
task.to_call(success_task, 12, 3)
result = task.run()
self.assertEqual(result, 15)
task.run()
self.assertEqual(task.result, 15)
self.assertTrue(task.started)
self.assertEqual(task.success_result, 15)
49 changes: 49 additions & 0 deletions tests/test_workers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import unittest
from unittest import mock

from alligator.gator import Gator
from alligator.workers import Worker
Expand Down Expand Up @@ -35,6 +36,10 @@ def rm_file():
pass


def raise_error(val):
raise ValueError("You've chosen... poorly.")


@unittest.skipIf(not ALLOW_SLOW, "Skipping slow worker tests")
class WorkerTestCase(unittest.TestCase):
def setUp(self):
Expand All @@ -57,6 +62,50 @@ def test_ident(self):
ident = self.worker.ident()
self.assertTrue(ident.startswith("Alligator Worker (#"))

def test_check_and_run_task(self):
self.assertEqual(read_file(), 0)

self.gator.task(incr_file, 2)
self.gator.task(incr_file, 3)

self.assertEqual(self.gator.backend.len("all"), 2)
self.assertEqual(self.worker.tasks_complete, 0)

self.worker.check_and_run_task()

self.assertEqual(self.gator.backend.len("all"), 1)
self.assertEqual(self.worker.tasks_complete, 1)
self.assertEqual(read_file(), 2)

def test_check_and_run_task_trap_exception(self):
self.assertEqual(read_file(), 0)

self.gator.task(incr_file, 2)
self.gator.task(incr_file, 3)
self.gator.task(raise_error, 75)
self.gator.task(incr_file, 4)

self.assertEqual(self.gator.backend.len("all"), 4)
self.assertEqual(self.worker.tasks_complete, 0)

with mock.patch.object(
self.worker.log, "exception"
) as mock_exception:
self.assertTrue(self.worker.check_and_run_task())
self.assertTrue(self.worker.check_and_run_task())

# Here, it should hit the exception BUT not stop execution.
self.assertFalse(self.worker.check_and_run_task())

self.assertTrue(self.worker.check_and_run_task())

# Make sure we tried to log that exception.
mock_exception.called_once()

self.assertEqual(self.gator.backend.len("all"), 0)
self.assertEqual(self.worker.tasks_complete, 3)
self.assertEqual(read_file(), 9)

def test_run_forever(self):
self.assertEqual(read_file(), 0)

Expand Down

0 comments on commit 9d91309

Please sign in to comment.