diff --git a/src/eascheduler/__init__.py b/src/eascheduler/__init__.py index 820699d..cca6e1c 100644 --- a/src/eascheduler/__init__.py +++ b/src/eascheduler/__init__.py @@ -3,6 +3,8 @@ from eascheduler.const import SKIP_EXECUTION from eascheduler import errors +from eascheduler.errors.handler import set_exception_handler + from eascheduler import jobs, schedulers, executors from eascheduler.scheduler_view import SchedulerView diff --git a/src/eascheduler/errors/__init__.py b/src/eascheduler/errors/__init__.py new file mode 100644 index 0000000..6f89809 --- /dev/null +++ b/src/eascheduler/errors/__init__.py @@ -0,0 +1 @@ +from .errors import FirstRunInThePastError, JobAlreadyCanceledException, OneTimeJobCanNotBeSkipped, UnknownWeekdayError diff --git a/src/eascheduler/errors.py b/src/eascheduler/errors/errors.py similarity index 78% rename from src/eascheduler/errors.py rename to src/eascheduler/errors/errors.py index e906eb6..26cbe2f 100644 --- a/src/eascheduler/errors.py +++ b/src/eascheduler/errors/errors.py @@ -11,5 +11,5 @@ class UnknownWeekdayError(Exception): pass -class FirstRunNotInTheFutureError(Exception): +class FirstRunInThePastError(Exception): pass diff --git a/src/eascheduler/errors/handler.py b/src/eascheduler/errors/handler.py new file mode 100644 index 0000000..6258fb6 --- /dev/null +++ b/src/eascheduler/errors/handler.py @@ -0,0 +1,14 @@ +import logging +from typing import Any, Callable + + +HANDLER: Callable[[Exception], Any] = lambda x: logging.getLogger('EAScheduler').error(x) + + +def set_exception_handler(handler: Callable[[Exception], Any]): + global HANDLER + HANDLER = handler + + +def process_exception(e: Exception): + HANDLER(e) diff --git a/src/eascheduler/executors/executor.py b/src/eascheduler/executors/executor.py index 4c499af..03e9bd3 100644 --- a/src/eascheduler/executors/executor.py +++ b/src/eascheduler/executors/executor.py @@ -1,12 +1,6 @@ -import logging -from asyncio import create_task, run_coroutine_threadsafe, AbstractEventLoop -from typing import Any, Callable +from asyncio import AbstractEventLoop, create_task, run_coroutine_threadsafe -log = logging.getLogger('AsyncScheduler') - - -def default_exception_handler(e: Exception): - log.error(str(e)) +from eascheduler.errors.handler import process_exception class ExecutorBase: @@ -25,14 +19,11 @@ def execute(self): class AsyncExecutor(ExecutorBase): - #: Function which will be called when an ``Exception`` occurs during the execution of the job - EXCEPTION_HANDLER: Callable[[Exception], Any] = default_exception_handler - async def _execute(self): try: await self._func(*self._args, **self._kwargs) except Exception as e: - default_exception_handler(e) + process_exception(e) def execute(self): create_task(self._execute()) @@ -43,4 +34,4 @@ class AsyncThreadSafeExecutor(AsyncExecutor): LOOP: AbstractEventLoop def execute(self): - run_coroutine_threadsafe(self.execute(), AsyncThreadSafeExecutor.LOOP) + run_coroutine_threadsafe(self._execute(), AsyncThreadSafeExecutor.LOOP) diff --git a/src/eascheduler/jobs/job_base.py b/src/eascheduler/jobs/job_base.py index 586b901..b672e50 100644 --- a/src/eascheduler/jobs/job_base.py +++ b/src/eascheduler/jobs/job_base.py @@ -1,10 +1,9 @@ - from datetime import datetime from typing import Optional, TYPE_CHECKING from pendulum import from_timestamp -from eascheduler.const import local_tz, FAR_FUTURE +from eascheduler.const import FAR_FUTURE, local_tz from eascheduler.errors import JobAlreadyCanceledException from eascheduler.executors import ExecutorBase @@ -24,8 +23,9 @@ def __init__(self, parent: 'AsyncScheduler', func: ExecutorBase): def _set_next_run(self, next_run: float): assert isinstance(next_run, (float, int)) - next_run = round(next_run, 3) # ms accuracy - if self._next_run == next_run: + + next_run = round(next_run, 3) # ms accuracy is enough + if self._next_run == next_run: # only set and subsequently reschedule if the timestamp changed return None self._next_run = next_run @@ -50,4 +50,4 @@ def __repr__(self): def get_next_run(self) -> datetime: """Return the next execution timestamp.""" - return from_timestamp(self._next_run).in_timezone(local_tz).naive() + return from_timestamp(self._next_run, local_tz).naive() diff --git a/src/eascheduler/jobs/job_countdown.py b/src/eascheduler/jobs/job_countdown.py index 67f91ff..fd6363b 100644 --- a/src/eascheduler/jobs/job_countdown.py +++ b/src/eascheduler/jobs/job_countdown.py @@ -17,15 +17,15 @@ def __init__(self, parent: AsyncScheduler, func: ExecutorBase): super().__init__(parent, func) self._expire: float = 0.0 - def expire(self, expire_time: Union[timedelta, float, int]) -> CountdownJob: + def countdown(self, time: Union[timedelta, float, int]) -> CountdownJob: """Set the time after which the job will be executed. - :param expire_time: time + :param time: time """ if self._parent is None: raise JobAlreadyCanceledException() - secs = expire_time.total_seconds() if isinstance(expire_time, timedelta) else expire_time + secs = time.total_seconds() if isinstance(time, timedelta) else time assert secs > 0, secs self._expire = float(secs) diff --git a/src/eascheduler/jobs/job_datetime_base.py b/src/eascheduler/jobs/job_datetime_base.py index 03d48c1..27129ca 100644 --- a/src/eascheduler/jobs/job_datetime_base.py +++ b/src/eascheduler/jobs/job_datetime_base.py @@ -9,7 +9,7 @@ from eascheduler.const import SKIP_EXECUTION, _Execution from eascheduler.const import local_tz, FAR_FUTURE -from eascheduler.errors import JobAlreadyCanceledException, FirstRunNotInTheFutureError +from eascheduler.errors import JobAlreadyCanceledException, FirstRunInThePastError from eascheduler.executors.executor import ExecutorBase from eascheduler.jobs.job_base import ScheduledJobBase from eascheduler.schedulers import AsyncScheduler @@ -140,7 +140,7 @@ def _update_run_time(self, dt_start: Optional[DateTime] = None) -> Union[DateTim custom_obj = self._boundary_func(next_run) if custom_obj is SKIP_EXECUTION: return SKIP_EXECUTION - next_run = instance(custom_obj).astimezone(local_tz).in_timezone(UTC) + next_run = instance(custom_obj, local_tz).astimezone(local_tz).in_timezone(UTC) if self._offset is not None: next_run += self._offset # offset doesn't have to be localized @@ -191,11 +191,11 @@ def _initialize_base_time(self, base_time: Union[None, int, float, timedelta, ti new_base = new_base.add(days=1) else: assert isinstance(base_time, datetime) - new_base = instance(base_time).astimezone(local_tz) + new_base = instance(base_time, tz=local_tz).astimezone(local_tz) assert isinstance(new_base, DateTime), type(new_base) if new_base <= now: - raise FirstRunNotInTheFutureError(f'First run must be in the future! Now: {now}, run: {new_base}') + raise FirstRunInThePastError(f'First run must be in the future! Now: {now}, run: {new_base}') new_base = new_base.in_timezone(UTC) self._next_base = new_base.timestamp() diff --git a/src/eascheduler/jobs/job_day_of_week.py b/src/eascheduler/jobs/job_day_of_week.py index a8a2887..9830256 100644 --- a/src/eascheduler/jobs/job_day_of_week.py +++ b/src/eascheduler/jobs/job_day_of_week.py @@ -4,7 +4,7 @@ from datetime import time as dt_time from typing import Union, Set, Iterable, Optional, Dict -from pendulum import DateTime, from_timestamp +from pendulum import DateTime, from_timestamp, UTC from pendulum import now as get_now from eascheduler.const import SKIP_EXECUTION, local_tz @@ -44,7 +44,7 @@ def _update_run_time(self, next_run: Optional[DateTime] = None) -> DateTime: while not next_run.isoweekday() in self._weekdays: next_run = next_run.add(days=1) - next_run = next_run.in_timezone('UTC') + next_run = next_run.in_timezone(UTC) res = update_run_time(next_run) return next_run @@ -56,10 +56,10 @@ def _update_base_time(self): while next_run < now: next_run = next_run.add(days=1) - while not next_run.isoweekday() in self._weekdays: + while next_run.isoweekday() not in self._weekdays: next_run = next_run.add(days=1) - next_run = next_run.in_timezone('UTC') + next_run = next_run.in_timezone(UTC) self._next_base = next_run.timestamp() self._update_run_time(next_run) return self diff --git a/src/eascheduler/scheduler_view.py b/src/eascheduler/scheduler_view.py index 5829ba3..db87d13 100644 --- a/src/eascheduler/scheduler_view.py +++ b/src/eascheduler/scheduler_view.py @@ -40,7 +40,7 @@ def countdown(self, expire_time: Union[dt_timedelta, float, int], callback, *arg :return: Created job """ job = CountdownJob(self._scheduler, self._executor(callback, *args, **kwargs)) - job.expire(expire_time) + job.countdown(expire_time) self._scheduler.add_job(job) return job diff --git a/src/eascheduler/schedulers/scheduler_async.py b/src/eascheduler/schedulers/scheduler_async.py index 0820c15..ab6329d 100644 --- a/src/eascheduler/schedulers/scheduler_async.py +++ b/src/eascheduler/schedulers/scheduler_async.py @@ -3,11 +3,14 @@ from asyncio import create_task from bisect import insort from collections import deque -from typing import Optional, Deque, Set +from typing import Deque, Optional, Set +from pendulum import UTC from pendulum import now as get_now +from eascheduler.const import FAR_FUTURE from eascheduler.jobs.job_base import ScheduledJobBase +from eascheduler.errors.handler import process_exception class AsyncScheduler: @@ -61,29 +64,40 @@ def cancel_all(self): job._parent = None async def __run_next(self): - while self.jobs: - job = self.jobs[0] + try: + while self.jobs: + job = self.jobs[0] - now = get_now().timestamp() + # Don't schedule these jobs + if job._next_run >= FAR_FUTURE: + break - diff = job._next_run - now - while diff > 0: - await asyncio.sleep(diff) - now = get_now().timestamp() + now = get_now(UTC).timestamp() diff = job._next_run - now - old_job = job - assert old_job is job, 'Job changed unexpectedly' + while diff > 0: + await asyncio.sleep(diff) + now = get_now(UTC).timestamp() + diff = job._next_run - now - job = self.jobs.popleft() - self.job_objs.remove(job) + old_job = job + assert old_job is job, 'Job changed unexpectedly' - # If it's a reoccurring job it has this function - if hasattr(job, '_update_base_time'): - job._update_base_time() - else: - job._parent = None + job = self.jobs.popleft() + self.job_objs.remove(job) - job._func.execute() + try: + # If it's a reoccurring job it has this function + if hasattr(job, '_update_base_time'): + job._update_base_time() + else: + job._parent = None - self.worker = None + job._func.execute() + except Exception as e: + process_exception(e) + + except Exception as e: + process_exception(e) + finally: + self.worker = None diff --git a/src/eascheduler/schedulers/scheduler_asyncthread.py b/src/eascheduler/schedulers/scheduler_asyncthread.py index 2a8da83..aea6b32 100644 --- a/src/eascheduler/schedulers/scheduler_asyncthread.py +++ b/src/eascheduler/schedulers/scheduler_asyncthread.py @@ -1,6 +1,7 @@ import logging from asyncio import AbstractEventLoop, run_coroutine_threadsafe -from threading import _MainThread, current_thread +from threading import current_thread +from threading import _MainThread # type: ignore from eascheduler.jobs.job_base import ScheduledJobBase from eascheduler.schedulers import AsyncScheduler diff --git a/tests/helper.py b/tests/helper.py index 34f20ff..8743627 100644 --- a/tests/helper.py +++ b/tests/helper.py @@ -1,9 +1,12 @@ +from typing import Union + try: from unittest.mock import AsyncMock except ImportError: from mock import AsyncMock -from pendulum import datetime +from datetime import datetime as dt_datetime +from pendulum import datetime, instance, from_timestamp, UTC, DateTime from pendulum import set_test_now as __set_test_now from eascheduler.const import local_tz @@ -30,3 +33,25 @@ def __init__(self, *args, **kwargs): def mocked_executor(*args, **kwargs) -> MockedAsyncExecutor: return MockedAsyncExecutor(AsyncMock(*args, **kwargs)) + + +def cmp_local(obj: Union[float, int, dt_datetime], local_dt: dt_datetime): + if isinstance(obj, DateTime): + assert obj.timezone + cmp_dt = obj.in_tz(local_tz).naive() + elif isinstance(obj, dt_datetime): + cmp_dt = instance(obj, local_tz).astimezone(local_tz).naive() + else: + cmp_dt = from_timestamp(obj, tz=local_tz).naive() + assert cmp_dt == local_dt, f'{cmp_dt}\n{local_dt}' + + +def cmp_utc(obj: Union[float, int, dt_datetime], local_dt: dt_datetime): + if isinstance(obj, DateTime): + assert obj.timezone + cmp_dt = obj.in_tz(UTC).naive() + elif isinstance(obj, dt_datetime): + cmp_dt = instance(obj, local_tz).astimezone(local_tz).in_tz(UTC).naive() + else: + cmp_dt = from_timestamp(obj, tz=UTC).naive() + assert cmp_dt == local_dt, f'{cmp_dt}\n{local_dt}' diff --git a/tests/jobs/test_job_countdown.py b/tests/jobs/test_job_countdown.py index 56c2385..01720aa 100644 --- a/tests/jobs/test_job_countdown.py +++ b/tests/jobs/test_job_countdown.py @@ -17,7 +17,7 @@ async def a_dummy(): calls.append(time.time()) j1 = CountdownJob(s, AsyncExecutor(a_dummy)) - j1.expire(0.2) + j1.countdown(0.2) s.add_job(j1) # check that adding the trigger doesn't execute the job await asyncio.sleep(0.25) diff --git a/tests/jobs/test_job_datetime.py b/tests/jobs/test_job_datetime.py index 2e3176f..78bb5b0 100644 --- a/tests/jobs/test_job_datetime.py +++ b/tests/jobs/test_job_datetime.py @@ -1,13 +1,15 @@ -from datetime import time, datetime, timedelta +from datetime import datetime, time, timedelta + import pytest +from pendulum import from_timestamp + +from eascheduler.const import local_tz +from eascheduler.errors import FirstRunInThePastError +from eascheduler.executors import AsyncExecutor from eascheduler.jobs.job_datetime_base import DateTimeJobBase -from tests.helper import utc_ts from eascheduler.schedulers import AsyncScheduler -from eascheduler.executors import AsyncExecutor -from eascheduler.const import local_tz -from eascheduler.errors import FirstRunNotInTheFutureError from tests.helper import set_now -from pendulum import from_timestamp +from tests.helper import utc_ts @pytest.mark.asyncio @@ -70,7 +72,7 @@ def test_func(obj): @pytest.mark.asyncio -async def test_func_boundary(): +async def test_func_boundary_changes_self(): async def bla(): pass @@ -80,6 +82,7 @@ async def bla(): set_now(2001, 1, 1, 7, 10) j._initialize_base_time(None) + assert from_timestamp(j._next_base).in_tz(local_tz).naive() == datetime(2001, 1, 1, 7, 10, 0, 1) # Boundary function test def test_func(obj): @@ -90,11 +93,35 @@ def test_func(obj): return obj j.boundary_func(test_func) - assert j.get_next_run() == datetime(2001, 1, 1, 8, 10) + assert from_timestamp(j._next_run).in_tz(local_tz).naive() == datetime(2001, 1, 1, 8, 10) + + j.cancel() + + +@pytest.mark.asyncio +async def test_func_boundary(): + async def bla(): + pass + + s = AsyncScheduler() + j = DateTimeJobBase(s, AsyncExecutor(bla)) + s.add_job(j) + + set_now(2001, 1, 1, 7, 10) + j._initialize_base_time(None) + + # Boundary function test + def test_func(obj): + assert obj == datetime(2001, 1, 1, 7, 10, 0, 1) + return obj + + j.boundary_func(test_func) + assert j.get_next_run() == datetime(2001, 1, 1, 7, 10) j.cancel() + @pytest.mark.asyncio async def test_initialize(): s = AsyncScheduler() @@ -105,6 +132,7 @@ async def test_initialize(): # Now j._initialize_base_time(None) assert from_timestamp(j._next_base).in_tz(local_tz).naive() == datetime(2001, 1, 1, 12, 0, 0, 1) + assert j._next_base == 978346800.000001 # Diff from now j._initialize_base_time(timedelta(days=1, minutes=30)) @@ -120,9 +148,17 @@ async def test_initialize(): j._initialize_base_time(time(1, 20, 30)) assert from_timestamp(j._next_base).in_tz(local_tz).naive() == datetime(2001, 1, 2, 1, 20, 30) - with pytest.raises(FirstRunNotInTheFutureError) as e: + # Specified time + j._initialize_base_time(datetime(2001, 1, 1, 12, 20, 30)) + j._update_run_time() + assert from_timestamp(j._next_base).in_tz(local_tz).naive() == datetime(2001, 1, 1, 12, 20, 30) + assert j.get_next_run() == datetime(2001, 1, 1, 12, 20, 30) + + with pytest.raises(FirstRunInThePastError) as e: j._initialize_base_time(datetime(2001, 1, 1, 1, 20, 30)) assert str(e.value) in ( - 'First run must be in the future! Now: 2001-01-01T12:00:00+01:00, run: 2001-01-01T02:20:30+01:00', - 'First run must be in the future! Now: 2001-01-01T12:00:00+00:00, run: 2001-01-01T01:20:30+00:00', + 'First run must be in the future! Now: 2001-01-01T12:00:00+01:00, run: 2001-01-01T01:20:30+01:00', + 'First run must be in the future! Now: 2001-01-01T12:00:00+00:00, run: 2001-01-01T00:20:30+00:00', ) + + j.cancel() diff --git a/tests/jobs/test_job_onetime.py b/tests/jobs/test_job_onetime.py index 31b6601..33949f9 100644 --- a/tests/jobs/test_job_onetime.py +++ b/tests/jobs/test_job_onetime.py @@ -1,5 +1,4 @@ import asyncio -import time import pytest @@ -15,6 +14,7 @@ def test_exception(): s = AsyncScheduler() j = OneTimeJob(s, lambda x: x) j._next_base = utc_ts(2001, 1, 1, 12) + with pytest.raises(OneTimeJobCanNotBeSkipped): j.boundary_func(lambda x: SKIP_EXECUTION) @@ -48,11 +48,19 @@ async def test_remove(): @pytest.mark.asyncio async def test_init(): + set_now(2001, 1, 1, 12, 0, 0) + s = AsyncScheduler() j = OneTimeJob(s, lambda x: x) j._initialize_base_time(None) - assert j._next_base <= round(time.time(), 3) + 0.01 + j._update_run_time() + assert int(j._next_base) == 978346800 + assert int(j._next_run) == 978346800 j._initialize_base_time(3) - assert j._next_base <= round(time.time() + 3, 3) + 0.01 + j._update_run_time() + assert int(j._next_base) == 978346803 + assert int(j._next_run) == 978346803 + + j.cancel() diff --git a/tests/jobs/test_job_reoccuring.py b/tests/jobs/test_job_reoccuring.py index 5175c3e..9f22c73 100644 --- a/tests/jobs/test_job_reoccuring.py +++ b/tests/jobs/test_job_reoccuring.py @@ -1,3 +1,4 @@ +from asyncio import sleep from datetime import datetime from functools import partial @@ -6,8 +7,7 @@ from eascheduler.const import SKIP_EXECUTION from eascheduler.jobs import ReoccurringJob from eascheduler.schedulers import AsyncScheduler -from tests.helper import set_now -from tests.helper import utc_ts +from tests.helper import cmp_local, set_now, utc_ts @pytest.mark.asyncio @@ -25,15 +25,15 @@ async def test_remove(): now(11) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12) + cmp_local(j._next_run, datetime(2001, 1, 1, 12)) now(12) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12, 0, 5) + cmp_local(j._next_run, datetime(2001, 1, 1, 12, 0, 5)) now(12, 0, 5) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12, 0, 10) + cmp_local(j._next_run, datetime(2001, 1, 1, 12, 0, 10)) s.cancel_all() @@ -59,18 +59,59 @@ def skip_func(dt: datetime): now(11) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12) + cmp_local(j._next_run, datetime(2001, 1, 1, 12)) now(12) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12, 0, 5) + cmp_local(j._next_run, datetime(2001, 1, 1, 12, 0, 5)) now(12, 0, 5) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12, 0, 15) + cmp_local(j._next_run, datetime(2001, 1, 1, 12, 0, 15)) now(12, 0, 15) j._update_base_time() - assert j.get_next_run() == datetime(2001, 1, 1, 12, 0, 20) + cmp_local(j._next_run, datetime(2001, 1, 1, 12, 0, 20)) + + s.cancel_all() + + +@pytest.mark.asyncio +async def test_func_exception(caplog): + async def bla(): + pass + + calls = 0 + + def func(arg): + nonlocal calls + if calls: + 1 / 0 + calls += 1 + return arg + + set_now(2001, 1, 1, 7, 10) + + s = AsyncScheduler() + j1 = ReoccurringJob(s, lambda x: x) + j1._interval = 999 + j1._initialize_base_time(999) + j1._update_run_time() + s.add_job(j1) + + j = ReoccurringJob(s, lambda x: x) + j._interval = 999 + s.add_job(j) + + j._initialize_base_time(None) + j.boundary_func(func) + + await sleep(0.3) + + # ensure that the worker is still running + assert s.worker is not None + + # ensure that the exception got caught + assert caplog.records[0].message == 'division by zero' s.cancel_all() diff --git a/tests/test_time_conversions.py b/tests/test_time_conversions.py new file mode 100644 index 0000000..a7f38aa --- /dev/null +++ b/tests/test_time_conversions.py @@ -0,0 +1,36 @@ +from datetime import datetime + +from pendulum import DateTime, UTC, from_timestamp, instance + +from eascheduler.const import local_tz + + +def test_to_timestamp(): + # Timestamps are always in UTC + assert DateTime(2001, 1, 1, 12, tzinfo=local_tz).timestamp() == 978346800 + assert DateTime(2001, 1, 1, 11, tzinfo=UTC).timestamp() == 978346800 + + +def test_from_timestamp(): + # Loading from timestamps always works correct + assert from_timestamp(978346800, tz=local_tz) == DateTime(2001, 1, 1, 12, tzinfo=local_tz) + assert from_timestamp(978346800, tz=UTC) == DateTime(2001, 1, 1, 11, tzinfo=UTC) + + +def test_from_instance(): + dt = datetime(2001, 1, 1, 12) + + # creating a DateTime from dt works like this + aware_obj = instance(dt, tz=local_tz) + assert aware_obj == DateTime(2001, 1, 1, 12, tzinfo=local_tz) + assert aware_obj.naive() == dt + + # back and forth conversation from naive to aware + naive = DateTime(2001, 1, 1, 11, tzinfo=UTC).in_timezone(local_tz).naive() + assert naive == DateTime(2001, 1, 1, 12) + aware = instance(naive, local_tz).astimezone(local_tz) + assert aware == DateTime(2001, 1, 1, 12, tzinfo=local_tz) + + # creating a DateTime from dt works like this + aware = instance(dt, local_tz).astimezone(local_tz) + assert aware == DateTime(2001, 1, 1, 12, tzinfo=local_tz)