Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions triggers/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(

self._trigger_count = 0
self.task = None # placeholder for the repeat task created in self.__wrapper
self.__next_run_time = None # placeholder for the current target run time

def __call__(self, func: CoroFunction):
return self.__wrapper(func)
Expand All @@ -102,13 +103,13 @@ async def wrapped() -> None:
async def inner():
# maybe wait for next trigger cycle
if not self.on_startup:
next_run = self.next_run
self.__next_run_time = self.next_run
if self.logger:
self.logger.info(
f'`on_startup` is set to `False`. First run of {self.__class__.__name__} for '
f'{func.__name__}: {next_run.isoformat()}'
f'{func.__name__}: {self.__next_run_time.isoformat()}'
)
await self.sleep_until(next_run)
await self.sleep_until(self.__next_run_time)

# repeat indefinitely
while True:
Expand All @@ -118,6 +119,13 @@ async def inner():
if self.max_trigger_count is not None:
self._trigger_count += 1

# safeguard against early triggers - apparently, a desync is possible
if (
self.__next_run_time is not None
and (now := datetime.datetime.now().astimezone()) < self.__next_run_time
):
await asyncio.sleep(max((self.__next_run_time - now).total_seconds(), 0))

# call the decorated function
try:
if self.iter_args:
Expand Down Expand Up @@ -154,23 +162,24 @@ async def inner():

# sleep until next execution time
try:
next_run = self.next_run
self.__next_run_time = self.next_run
except StopRunning:
if self.logger:
self.logger.info(
f'{self.__class__.__name__} received StopRunning for {func.__name__}. Terminating'
)
break
if self.logger and datetime.datetime.now().astimezone() <= next_run:
if self.logger and datetime.datetime.now().astimezone() <= self.__next_run_time:
self.logger.info(
f'{self.__class__.__name__} finished for {func.__name__}. Next run: {next_run.isoformat()}'
f'{self.__class__.__name__} finished for {func.__name__}. '
f'Next run: {self.__next_run_time.isoformat()}'
)
elif self.logger: # i.e. next_run is in the past
self.logger.warning(
f'{self.__class__.__name__} missed the scheduled run time for {func.__name__}. Running now'
)

await self.sleep_until(next_run)
await self.sleep_until(self.__next_run_time)

# create a reference to the repeating task to prevent it from accidentally being garbage collected
self.task = self.loop.create_task(inner())
Expand Down