This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
/
task_scheduler.py
111 lines (93 loc) · 3.61 KB
/
task_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
import attr
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonMapping, ScheduledTask, TaskState
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
class TaskSchedulerHandler:
SCHEDULING_INTERVAL_MS = 10 * 60 * 1000 # 10mn
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self._is_master = hs.config.worker.worker_app is None
self.running_tasks: Set[str] = set()
self.actions: Dict[
str, Callable[[ScheduledTask], Awaitable[Optional[ScheduledTask]]]
] = {}
if self._is_master:
self.clock.looping_call(
run_as_background_process,
TaskSchedulerHandler.SCHEDULING_INTERVAL_MS,
"scheduled_tasks_loop",
self._scheduled_tasks_loop,
)
def bind_action(
self,
fct: Callable[[ScheduledTask], Awaitable[Optional[ScheduledTask]]],
action_name: str,
) -> None:
self.actions[action_name] = fct
async def schedule_task(
self,
action: str,
*,
resource_id: Optional[str] = None,
timestamp: Optional[int] = None,
params: Optional[JsonMapping] = None,
) -> str:
if action not in self.actions:
# TODO
raise SynapseError(400, "Test")
task_id = random_string(16)
state = TaskState.SCHEDULED
if timestamp is None or timestamp < self.clock.time_msec():
state = TaskState.RUNNING
timestamp = self.clock.time_msec()
task = ScheduledTask(
task_id,
action,
state,
resource_id,
timestamp,
params,
None,
)
await self.store.upsert_scheduled_task(task)
return task_id
async def update_task_state(
self,
task: ScheduledTask,
# error: Optional[str],
) -> None:
await self.store.upsert_scheduled_task(task)
async def get_task(self, id: str) -> Optional[ScheduledTask]:
return await self.store.get_scheduled_task(id)
async def get_tasks(
self, action: str, resource_id: Optional[str]
) -> List[ScheduledTask]:
return await self.store.get_scheduled_tasks(action, resource_id)
async def _scheduled_tasks_loop(self) -> None:
for task in await self.store.get_scheduled_tasks():
if task.id not in self.running_tasks:
state = task.state
if (
state == TaskState.SCHEDULED
and task.timestamp is not None
and task.timestamp < self.clock.time_msec()
):
state = TaskState.RUNNING
if state == TaskState.RUNNING:
await self.store.upsert_scheduled_task(task)
self._run_task(task)
def _run_task(self, task: ScheduledTask) -> None:
if task.action in self.actions:
fct = self.actions[task.action]
async def wrapper() -> None:
updated_task = await fct(task)
if updated_task is None:
updated_task = attr.evolve(task, state=TaskState.COMPLETE)
await self.update_task_state(updated_task)
run_as_background_process(task.action, wrapper)
self.running_tasks.add(task.id)