Skip to content

Commit

Permalink
fix !103 (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
keotl committed Jun 17, 2020
1 parent e22760f commit 86ff0c5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
15 changes: 9 additions & 6 deletions jivago/scheduling/scheduled_task_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import threading
import time
from datetime import datetime
Expand All @@ -17,22 +18,24 @@ def __init__(self, runner_class: type, schedule: Schedule, service_locator: Serv
self.thread_stop_event = threading.Event()
self.thread = threading.Thread(target=self.run, daemon=True)
self.run_lock = threading.Lock()
self.logger = logging.getLogger(ScheduledTaskRunner.__name__)

@Override
def run(self):
while not self.thread_stop_event.is_set():
sleep_time = self.schedule.next_start_time() - datetime.utcnow()
if sleep_time.total_seconds() > 0:
time.sleep(sleep_time.total_seconds())
self.run_lock.acquire()
self.service_locator.get(self.runner_class).run()
self.run_lock.release()
with self.run_lock:
try:
self.service_locator.get(self.runner_class).run()
except Exception as e:
self.logger.warning(f"Uncaught exception while executing scheduled task {self.runner_class}: {e}.")

def stop(self):
self.thread_stop_event.set()
self.run_lock.acquire()
self.service_locator.get(self.runner_class).cleanup()
self.run_lock.release()
with self.run_lock:
self.service_locator.get(self.runner_class).cleanup()

def start(self):
self.thread.start()
57 changes: 57 additions & 0 deletions test/scheduling/test_scheduled_task_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest
import time
from datetime import datetime
from unittest import mock

from jivago.inject.service_locator import ServiceLocator
from jivago.lang.annotations import Override
from jivago.lang.runnable import Runnable
from jivago.scheduling.schedule import Schedule
from jivago.scheduling.scheduled_task_runner import ScheduledTaskRunner


class ScheduledTaskRunnerTests(unittest.TestCase):

def setUp(self):
self.service_locator = ServiceLocator()
self.service_locator.bind(SomeScheduledTask, SomeScheduledTask)
self.service_locator.bind(SomeCrashingTask, SomeCrashingTask)
self.schedule_mock: Schedule = mock.create_autospec(Schedule)
self.schedule_mock.next_start_time.return_value = datetime.now()

def test_run_scheduled_task(self):
self.runner = ScheduledTaskRunner(SomeScheduledTask, self.schedule_mock, self.service_locator)

self.runner.start()

self.assertTrue(SomeScheduledTask.was_called)

def test_givenUncaughtException_whenExecuting_shouldNotCrash(self):
self.runner = ScheduledTaskRunner(SomeCrashingTask, self.schedule_mock, self.service_locator)

self.runner.start()
time.sleep(0.05)

self.assertTrue(SomeCrashingTask.times_called > 1)

def tearDown(self):
self.runner.stop()


class SomeScheduledTask(Runnable):
was_called = False

@Override
def run(self):
SomeScheduledTask.was_called = True


class SomeCrashingTask(Runnable):
times_called = 0

@Override
def run(self):
SomeCrashingTask.times_called += 1

if SomeCrashingTask.times_called == 1:
raise Exception("Error!!")

0 comments on commit 86ff0c5

Please sign in to comment.