Skip to content

Commit

Permalink
Updates and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemanspiff2007 committed Apr 19, 2021
1 parent 2c8cbcf commit b73a835
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 76 deletions.
2 changes: 2 additions & 0 deletions src/eascheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from eascheduler.const import SKIP_EXECUTION

from eascheduler import errors
from eascheduler.errors.handler import set_exception_handler

from eascheduler import jobs, schedulers, executors
from eascheduler.scheduler_view import SchedulerView

Expand Down
1 change: 1 addition & 0 deletions src/eascheduler/errors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .errors import FirstRunInThePastError, JobAlreadyCanceledException, OneTimeJobCanNotBeSkipped, UnknownWeekdayError
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ class UnknownWeekdayError(Exception):
pass


class FirstRunNotInTheFutureError(Exception):
class FirstRunInThePastError(Exception):
pass
14 changes: 14 additions & 0 deletions src/eascheduler/errors/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import logging
from typing import Any, Callable


HANDLER: Callable[[Exception], Any] = lambda x: logging.getLogger('EAScheduler').error(x)


def set_exception_handler(handler: Callable[[Exception], Any]):
global HANDLER
HANDLER = handler


def process_exception(e: Exception):
HANDLER(e)
17 changes: 4 additions & 13 deletions src/eascheduler/executors/executor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import logging
from asyncio import create_task, run_coroutine_threadsafe, AbstractEventLoop
from typing import Any, Callable
from asyncio import AbstractEventLoop, create_task, run_coroutine_threadsafe

log = logging.getLogger('AsyncScheduler')


def default_exception_handler(e: Exception):
log.error(str(e))
from eascheduler.errors.handler import process_exception


class ExecutorBase:
Expand All @@ -25,14 +19,11 @@ def execute(self):


class AsyncExecutor(ExecutorBase):
#: Function which will be called when an ``Exception`` occurs during the execution of the job
EXCEPTION_HANDLER: Callable[[Exception], Any] = default_exception_handler

async def _execute(self):
try:
await self._func(*self._args, **self._kwargs)
except Exception as e:
default_exception_handler(e)
process_exception(e)

def execute(self):
create_task(self._execute())
Expand All @@ -43,4 +34,4 @@ class AsyncThreadSafeExecutor(AsyncExecutor):
LOOP: AbstractEventLoop

def execute(self):
run_coroutine_threadsafe(self.execute(), AsyncThreadSafeExecutor.LOOP)
run_coroutine_threadsafe(self._execute(), AsyncThreadSafeExecutor.LOOP)
10 changes: 5 additions & 5 deletions src/eascheduler/jobs/job_base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@

from datetime import datetime
from typing import Optional, TYPE_CHECKING

from pendulum import from_timestamp

from eascheduler.const import local_tz, FAR_FUTURE
from eascheduler.const import FAR_FUTURE, local_tz
from eascheduler.errors import JobAlreadyCanceledException
from eascheduler.executors import ExecutorBase

Expand All @@ -24,8 +23,9 @@ def __init__(self, parent: 'AsyncScheduler', func: ExecutorBase):

def _set_next_run(self, next_run: float):
assert isinstance(next_run, (float, int))
next_run = round(next_run, 3) # ms accuracy
if self._next_run == next_run:

next_run = round(next_run, 3) # ms accuracy is enough
if self._next_run == next_run: # only set and subsequently reschedule if the timestamp changed
return None

self._next_run = next_run
Expand All @@ -50,4 +50,4 @@ def __repr__(self):

def get_next_run(self) -> datetime:
"""Return the next execution timestamp."""
return from_timestamp(self._next_run).in_timezone(local_tz).naive()
return from_timestamp(self._next_run, local_tz).naive()
6 changes: 3 additions & 3 deletions src/eascheduler/jobs/job_countdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def __init__(self, parent: AsyncScheduler, func: ExecutorBase):
super().__init__(parent, func)
self._expire: float = 0.0

def expire(self, expire_time: Union[timedelta, float, int]) -> CountdownJob:
def countdown(self, time: Union[timedelta, float, int]) -> CountdownJob:
"""Set the time after which the job will be executed.
:param expire_time: time
:param time: time
"""
if self._parent is None:
raise JobAlreadyCanceledException()

secs = expire_time.total_seconds() if isinstance(expire_time, timedelta) else expire_time
secs = time.total_seconds() if isinstance(time, timedelta) else time
assert secs > 0, secs

self._expire = float(secs)
Expand Down
8 changes: 4 additions & 4 deletions src/eascheduler/jobs/job_datetime_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from eascheduler.const import SKIP_EXECUTION, _Execution
from eascheduler.const import local_tz, FAR_FUTURE
from eascheduler.errors import JobAlreadyCanceledException, FirstRunNotInTheFutureError
from eascheduler.errors import JobAlreadyCanceledException, FirstRunInThePastError
from eascheduler.executors.executor import ExecutorBase
from eascheduler.jobs.job_base import ScheduledJobBase
from eascheduler.schedulers import AsyncScheduler
Expand Down Expand Up @@ -140,7 +140,7 @@ def _update_run_time(self, dt_start: Optional[DateTime] = None) -> Union[DateTim
custom_obj = self._boundary_func(next_run)
if custom_obj is SKIP_EXECUTION:
return SKIP_EXECUTION
next_run = instance(custom_obj).astimezone(local_tz).in_timezone(UTC)
next_run = instance(custom_obj, local_tz).astimezone(local_tz).in_timezone(UTC)

if self._offset is not None:
next_run += self._offset # offset doesn't have to be localized
Expand Down Expand Up @@ -191,11 +191,11 @@ def _initialize_base_time(self, base_time: Union[None, int, float, timedelta, ti
new_base = new_base.add(days=1)
else:
assert isinstance(base_time, datetime)
new_base = instance(base_time).astimezone(local_tz)
new_base = instance(base_time, tz=local_tz).astimezone(local_tz)

assert isinstance(new_base, DateTime), type(new_base)
if new_base <= now:
raise FirstRunNotInTheFutureError(f'First run must be in the future! Now: {now}, run: {new_base}')
raise FirstRunInThePastError(f'First run must be in the future! Now: {now}, run: {new_base}')

new_base = new_base.in_timezone(UTC)
self._next_base = new_base.timestamp()
Expand Down
8 changes: 4 additions & 4 deletions src/eascheduler/jobs/job_day_of_week.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import time as dt_time
from typing import Union, Set, Iterable, Optional, Dict

from pendulum import DateTime, from_timestamp
from pendulum import DateTime, from_timestamp, UTC
from pendulum import now as get_now

from eascheduler.const import SKIP_EXECUTION, local_tz
Expand Down Expand Up @@ -44,7 +44,7 @@ def _update_run_time(self, next_run: Optional[DateTime] = None) -> DateTime:

while not next_run.isoweekday() in self._weekdays:
next_run = next_run.add(days=1)
next_run = next_run.in_timezone('UTC')
next_run = next_run.in_timezone(UTC)

res = update_run_time(next_run)
return next_run
Expand All @@ -56,10 +56,10 @@ def _update_base_time(self):
while next_run < now:
next_run = next_run.add(days=1)

while not next_run.isoweekday() in self._weekdays:
while next_run.isoweekday() not in self._weekdays:
next_run = next_run.add(days=1)

next_run = next_run.in_timezone('UTC')
next_run = next_run.in_timezone(UTC)
self._next_base = next_run.timestamp()
self._update_run_time(next_run)
return self
Expand Down
2 changes: 1 addition & 1 deletion src/eascheduler/scheduler_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def countdown(self, expire_time: Union[dt_timedelta, float, int], callback, *arg
:return: Created job
"""
job = CountdownJob(self._scheduler, self._executor(callback, *args, **kwargs))
job.expire(expire_time)
job.countdown(expire_time)
self._scheduler.add_job(job)
return job

Expand Down
52 changes: 33 additions & 19 deletions src/eascheduler/schedulers/scheduler_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from asyncio import create_task
from bisect import insort
from collections import deque
from typing import Optional, Deque, Set
from typing import Deque, Optional, Set

from pendulum import UTC
from pendulum import now as get_now

from eascheduler.const import FAR_FUTURE
from eascheduler.jobs.job_base import ScheduledJobBase
from eascheduler.errors.handler import process_exception


class AsyncScheduler:
Expand Down Expand Up @@ -61,29 +64,40 @@ def cancel_all(self):
job._parent = None

async def __run_next(self):
while self.jobs:
job = self.jobs[0]
try:
while self.jobs:
job = self.jobs[0]

now = get_now().timestamp()
# Don't schedule these jobs
if job._next_run >= FAR_FUTURE:
break

diff = job._next_run - now
while diff > 0:
await asyncio.sleep(diff)
now = get_now().timestamp()
now = get_now(UTC).timestamp()
diff = job._next_run - now

old_job = job
assert old_job is job, 'Job changed unexpectedly'
while diff > 0:
await asyncio.sleep(diff)
now = get_now(UTC).timestamp()
diff = job._next_run - now

job = self.jobs.popleft()
self.job_objs.remove(job)
old_job = job
assert old_job is job, 'Job changed unexpectedly'

# If it's a reoccurring job it has this function
if hasattr(job, '_update_base_time'):
job._update_base_time()
else:
job._parent = None
job = self.jobs.popleft()
self.job_objs.remove(job)

job._func.execute()
try:
# If it's a reoccurring job it has this function
if hasattr(job, '_update_base_time'):
job._update_base_time()
else:
job._parent = None

self.worker = None
job._func.execute()
except Exception as e:
process_exception(e)

except Exception as e:
process_exception(e)
finally:
self.worker = None
3 changes: 2 additions & 1 deletion src/eascheduler/schedulers/scheduler_asyncthread.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from asyncio import AbstractEventLoop, run_coroutine_threadsafe
from threading import _MainThread, current_thread
from threading import current_thread
from threading import _MainThread # type: ignore

from eascheduler.jobs.job_base import ScheduledJobBase
from eascheduler.schedulers import AsyncScheduler
Expand Down
27 changes: 26 additions & 1 deletion tests/helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Union

try:
from unittest.mock import AsyncMock
except ImportError:
from mock import AsyncMock

from pendulum import datetime
from datetime import datetime as dt_datetime
from pendulum import datetime, instance, from_timestamp, UTC, DateTime
from pendulum import set_test_now as __set_test_now

from eascheduler.const import local_tz
Expand All @@ -30,3 +33,25 @@ def __init__(self, *args, **kwargs):

def mocked_executor(*args, **kwargs) -> MockedAsyncExecutor:
return MockedAsyncExecutor(AsyncMock(*args, **kwargs))


def cmp_local(obj: Union[float, int, dt_datetime], local_dt: dt_datetime):
if isinstance(obj, DateTime):
assert obj.timezone
cmp_dt = obj.in_tz(local_tz).naive()
elif isinstance(obj, dt_datetime):
cmp_dt = instance(obj, local_tz).astimezone(local_tz).naive()
else:
cmp_dt = from_timestamp(obj, tz=local_tz).naive()
assert cmp_dt == local_dt, f'{cmp_dt}\n{local_dt}'


def cmp_utc(obj: Union[float, int, dt_datetime], local_dt: dt_datetime):
if isinstance(obj, DateTime):
assert obj.timezone
cmp_dt = obj.in_tz(UTC).naive()
elif isinstance(obj, dt_datetime):
cmp_dt = instance(obj, local_tz).astimezone(local_tz).in_tz(UTC).naive()
else:
cmp_dt = from_timestamp(obj, tz=UTC).naive()
assert cmp_dt == local_dt, f'{cmp_dt}\n{local_dt}'
2 changes: 1 addition & 1 deletion tests/jobs/test_job_countdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def a_dummy():
calls.append(time.time())

j1 = CountdownJob(s, AsyncExecutor(a_dummy))
j1.expire(0.2)
j1.countdown(0.2)
s.add_job(j1)
# check that adding the trigger doesn't execute the job
await asyncio.sleep(0.25)
Expand Down
Loading

0 comments on commit b73a835

Please sign in to comment.