Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
Router,
)
from .scheduling import (
CronTab,
PeriodicTask,
PeriodicTaskScheduler,
PeriodicTaskSchedulerPort,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
from __future__ import (
annotations,
)

from abc import (
ABC,
)
from collections.abc import (
Iterable,
)
from typing import (
TYPE_CHECKING,
Final,
Union,
)

from crontab import (
CronTab,
)

from .abc import (
EnrouteDecorator,
)
from .kinds import (
EnrouteDecoratorKind,
)

if TYPE_CHECKING:
from crontab import CronTab as CronTabImpl

from ...scheduling import (
CronTab,
)


class PeriodicEnrouteDecorator(EnrouteDecorator, ABC):
"""Periodic Enroute class"""

def __init__(self, crontab: Union[str, CronTab]):
if isinstance(crontab, str):
def __init__(self, crontab: Union[str, CronTab, CronTabImpl]):
from ...scheduling import (
CronTab,
)

if not isinstance(crontab, CronTab):
crontab = CronTab(crontab)
self.crontab = crontab

def __iter__(self) -> Iterable:
yield from (self.crontab.matchers,)
yield from (self.crontab,)


class PeriodicEventEnrouteDecorator(PeriodicEnrouteDecorator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from .crontab import (
CronTab,
)
from .ports import (
PeriodicTaskSchedulerPort,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from __future__ import (
annotations,
)

import asyncio
from datetime import (
datetime,
)
from itertools import (
count,
)
from math import (
inf,
)
from typing import (
Any,
AsyncIterator,
Optional,
Union,
)

from crontab import CronTab as CrontTabImpl

from minos.common import (
current_datetime,
)


class CronTab:
"""CronTab class."""

def __init__(self, pattern: Union[str, CrontTabImpl]):
if isinstance(pattern, str) and pattern == "@reboot":
pattern = None
elif not isinstance(pattern, CrontTabImpl):
pattern = CrontTabImpl(pattern)
self._impl = pattern

@property
def impl(self) -> Optional[CrontTabImpl]:
"""Get the crontab implementation.

:return: A ``crontab.CronTab`` or ``None``.
"""
return self._impl

def __hash__(self):
return hash((type(self), self._impl_matchers))

def __eq__(self, other: Any) -> bool:
return isinstance(other, type(self)) and self._impl_matchers == other._impl_matchers

@property
def _impl_matchers(self):
if self._impl is None:
return None
return self._impl.matchers

async def __aiter__(self) -> AsyncIterator[datetime]:
counter = count()
now = current_datetime()
while next(counter) < self.repetitions:
await self.sleep_until_next(now)
now = current_datetime()
yield now

await asyncio.sleep(inf)

async def sleep_until_next(self, *args, **kwargs) -> None:
"""Sleep until next matching.

:param args: Additional positional arguments.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
"""
await asyncio.sleep(self.get_delay_until_next(*args, **kwargs))

def get_delay_until_next(self, now: Optional[datetime] = None) -> float:
"""Get the time to wait for next matching.

:param now: Current time.
:return:
"""
if self._impl is None:
return 0

if now is None:
now = current_datetime()
return self._impl.next(now)

@property
def repetitions(self) -> Union[int, float]:
"""Get the number of repetitions.

:return: A ``float`` value.
"""
if self._impl is None:
return 1
return inf
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
Union,
)

from crontab import (
CronTab,
)
from crontab import CronTab as CronTabImpl

from minos.common import (
Config,
Expand All @@ -37,6 +35,9 @@
from ..requests import (
ResponseException,
)
from .crontab import (
CronTab,
)
from .requests import (
ScheduledRequest,
)
Expand Down Expand Up @@ -93,8 +94,8 @@ class PeriodicTask:

_task: Optional[asyncio.Task]

def __init__(self, crontab: Union[str, CronTab], fn: Callable[[ScheduledRequest], Awaitable[None]]):
if isinstance(crontab, str):
def __init__(self, crontab: Union[str, CronTab, CronTabImpl], fn: Callable[[ScheduledRequest], Awaitable[None]]):
if not isinstance(crontab, CronTab):
crontab = CronTab(crontab)

self._crontab = crontab
Expand Down Expand Up @@ -161,12 +162,9 @@ async def run_forever(self) -> NoReturn:

:return: This method never returns.
"""
now = current_datetime()
await asyncio.sleep(self._crontab.next(now))

while True:
now = current_datetime()
await asyncio.gather(asyncio.sleep(self._crontab.next(now)), self.run_once(now))
async for now in self._crontab:
await self.run_once(now)

@property
def running(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import unittest
from datetime import (
datetime,
time,
timedelta,
timezone,
)
from math import (
inf,
)
from unittest.mock import (
MagicMock,
call,
patch,
)

from crontab import CronTab as CrontabImpl

from minos.common import (
current_datetime,
)
from minos.networks import (
CronTab,
)


class TestCronTab(unittest.IsolatedAsyncioTestCase):
def test_constructor(self):
crontab = CronTab("@daily")
self.assertEqual(CrontabImpl("@daily").matchers, crontab.impl.matchers)

def test_constructor_reboot(self):
crontab = CronTab("@reboot")
self.assertEqual(None, crontab.impl)

def test_constructor_raises(self):
with self.assertRaises(ValueError):
CronTab("foo")

def test_repetitions(self):
crontab = CronTab("@daily")
self.assertEqual(inf, crontab.repetitions)

def test_repetitions_reboot(self):
crontab = CronTab("@reboot")
self.assertEqual(1, crontab.repetitions)

def test_get_delay_until_next(self):
crontab = CronTab("@daily")
now = current_datetime()

expected = (
datetime.combine(now.date() + timedelta(days=1), time.min, tzinfo=timezone.utc) - now
).total_seconds()
self.assertAlmostEqual(expected, crontab.get_delay_until_next(), places=1)

def test_get_delay_until_next_reboot(self):
crontab = CronTab("@reboot")
self.assertEqual(0, crontab.get_delay_until_next())

def test_hash(self):
crontab = CronTab("@daily")
self.assertIsInstance(hash(crontab), int)

def test_hash_reboot(self):
crontab = CronTab("@reboot")
self.assertIsInstance(hash(crontab), int)

def test_eq(self):
base = CronTab("@daily")
one = CronTab("@daily")
self.assertEqual(base, one)

two = CronTab("@hourly")
self.assertNotEqual(base, two)

three = CronTab("@reboot")
self.assertNotEqual(base, three)

async def test_sleep_until_next(self):
crontab = CronTab("@reboot")

mock = MagicMock(return_value=1234)
crontab.get_delay_until_next = mock

with patch("asyncio.sleep") as mock_sleep:
await crontab.sleep_until_next()

self.assertEqual([call(1234)], mock_sleep.call_args_list)

async def test_aiter(self):
crontab = CronTab("@reboot")

with patch("asyncio.sleep") as mock_sleep:
count = 0
async for now in crontab:
count += 1
self.assertAlmostEqual(current_datetime(), now, delta=timedelta(seconds=1))
self.assertEqual(1, count)

self.assertEqual([call(0), call(inf)], mock_sleep.call_args_list)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import math
import unittest
import warnings
from unittest.mock import (
Expand All @@ -8,15 +9,12 @@
patch,
)

from crontab import (
CronTab,
)

from minos.common import (
Config,
current_datetime,
)
from minos.networks import (
CronTab,
PeriodicTask,
PeriodicTaskScheduler,
ScheduledRequest,
Expand Down Expand Up @@ -81,7 +79,7 @@ def setUp(self) -> None:
self.periodic = PeriodicTask("@daily", self.fn_mock)

def test_crontab(self) -> None:
self.assertEqual(CronTab("@daily").matchers, self.periodic.crontab.matchers)
self.assertEqual(CronTab("@daily"), self.periodic.crontab)

def test_fn(self) -> None:
self.assertEqual(self.fn_mock, self.periodic.fn)
Expand Down Expand Up @@ -110,13 +108,25 @@ async def test_stop(self) -> None:

async def test_run_forever(self) -> None:
with patch("asyncio.sleep") as mock_sleep:
run_once_mock = AsyncMock(side_effect=ValueError)
run_once_mock = AsyncMock(side_effect=[int, ValueError])
self.periodic.run_once = run_once_mock

with self.assertRaises(ValueError):
await self.periodic.run_forever()

self.assertEqual(2, mock_sleep.call_count)
self.assertEqual(2, run_once_mock.call_count)

async def test_run_forever_once(self) -> None:
periodic = PeriodicTask("@reboot", self.fn_mock)
with patch("asyncio.sleep", AsyncMock(side_effect=[int, ValueError])) as mock_sleep:
run_once_mock = AsyncMock()
periodic.run_once = run_once_mock

with self.assertRaises(ValueError):
await periodic.run_forever()

self.assertEqual([call(0), call(math.inf)], mock_sleep.call_args_list)
self.assertEqual(1, run_once_mock.call_count)

async def test_run_once(self) -> None:
Expand Down