From 470f385419be2f6d3619e3bd67a1ba8f55a116cd Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 7 Jul 2023 17:41:05 +0200 Subject: [PATCH 01/34] Implements a task scheduler for resumable potentially long running tasks --- changelog.d/15891.feature | 1 + synapse/app/generic_worker.py | 2 + synapse/server.py | 6 + synapse/storage/databases/main/__init__.py | 2 + .../storage/databases/main/task_scheduler.py | 173 +++++++++++++ .../main/delta/79/03_scheduled_tasks.sql | 26 ++ synapse/types/__init__.py | 38 +++ synapse/util/task_scheduler.py | 244 ++++++++++++++++++ tests/util/test_task_scheduler.py | 132 ++++++++++ 9 files changed, 624 insertions(+) create mode 100644 changelog.d/15891.feature create mode 100644 synapse/storage/databases/main/task_scheduler.py create mode 100644 synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql create mode 100644 synapse/util/task_scheduler.py create mode 100644 tests/util/test_task_scheduler.py diff --git a/changelog.d/15891.feature b/changelog.d/15891.feature new file mode 100644 index 000000000000..5024b5adc435 --- /dev/null +++ b/changelog.d/15891.feature @@ -0,0 +1 @@ +Implements a task scheduler for resumable potentially long running tasks. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index dc79efcc142f..d25e3548e075 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -91,6 +91,7 @@ from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.tags import TagsWorkerStore +from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore @@ -144,6 +145,7 @@ class GenericWorkerStore( TransactionWorkerStore, LockStore, SessionStore, + TaskSchedulerWorkerStore, ): # Properties that multiple storage classes define. Tell mypy what the # expected type is. diff --git a/synapse/server.py b/synapse/server.py index b72b76a38b35..b1c7fedaca42 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -141,6 +141,7 @@ from synapse.util.macaroons import MacaroonGenerator from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.stringutils import random_string +from synapse.util.task_scheduler import TaskScheduler logger = logging.getLogger(__name__) @@ -359,6 +360,7 @@ def setup_background_tasks(self) -> None: """ for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP: getattr(self, "get_" + i + "_handler")() + self.get_task_scheduler() def get_reactor(self) -> ISynapseReactor: """ @@ -912,3 +914,7 @@ def get_request_ratelimiter(self) -> RequestRatelimiter: def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager: """Usage metrics shared between phone home stats and the prometheus exporter.""" return CommonUsageMetricsManager(self) + + @cache_in_self + def get_task_scheduler(self) -> TaskScheduler: + return TaskScheduler(self) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index b6028853c939..cb8fb665e478 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -70,6 +70,7 @@ from .stats import StatsStore from .stream import StreamWorkerStore from .tags import TagsStore +from .task_scheduler import TaskSchedulerWorkerStore from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore @@ -127,6 +128,7 @@ class DataStore( CacheInvalidationWorkerStore, LockStore, SessionStore, + TaskSchedulerWorkerStore, ): def __init__( self, diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py new file mode 100644 index 000000000000..37c2110bbd30 --- /dev/null +++ b/synapse/storage/databases/main/task_scheduler.py @@ -0,0 +1,173 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from synapse.storage._base import SQLBaseStore +from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class TaskSchedulerWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + @staticmethod + def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: + row["status"] = TaskStatus(row["status"]) + if row["params"] is not None: + row["params"] = json.loads(row["params"]) + if row["result"] is not None: + row["result"] = json.loads(row["result"]) + return ScheduledTask(**row) + + async def get_scheduled_tasks( + self, action: Optional[str] = None, resource_id: Optional[str] = None + ) -> List[ScheduledTask]: + """Get a list of scheduled tasks from the DB. + + If the parameters are `None` all the tasks are returned. + + Args: + action: Limit the returned tasks to this specific action name + resource_id: Limit the returned tasks to this specific resource id + + Returns: a list of `ScheduledTask` + """ + keyvalues = {} + if action: + keyvalues["action"] = action + if resource_id: + keyvalues["resource_id"] = resource_id + + rows = await self.db_pool.simple_select_list( + table="scheduled_tasks", + keyvalues=keyvalues, + retcols=( + "id", + "action", + "status", + "timestamp", + "resource_id", + "params", + "result", + "error", + ), + desc="get_scheduled_tasks", + ) + + return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] + + async def upsert_scheduled_task(self, task: ScheduledTask) -> None: + """Upsert a specified `ScheduledTask` in the DB. + + Args: + task: the `ScheduledTask` to upsert + """ + await self.db_pool.simple_upsert( + "scheduled_tasks", + {"id": task.id}, + { + "action": task.action, + "status": task.status, + "timestamp": task.timestamp, + "resource_id": task.resource_id, + "params": None if task.params is None else json.dumps(task.params), + "result": None if task.result is None else json.dumps(task.result), + "error": task.error, + }, + desc="upsert_scheduled_task", + ) + + async def update_scheduled_task( + self, + id: str, + *, + timestamp: Optional[int] = None, + status: Optional[TaskStatus] = None, + result: Optional[JsonMapping] = None, + error: Optional[str] = None, + ) -> None: + """Update a scheduled task in the DB with some new value(s). + + Args: + id: id of the `ScheduledTask` to update + timestamp: new timestamp of the task + status: new status of the task + result: new result of the task + error: new error of the task + """ + updatevalues: JsonDict = {} + if timestamp is not None: + updatevalues["timestamp"] = timestamp + if status is not None: + updatevalues["status"] = status + if result is not None: + updatevalues["result"] = json.dumps(result) + if error is not None: + updatevalues["error"] = error + await self.db_pool.simple_update( + "scheduled_tasks", + {"id": id}, + updatevalues, + desc="update_scheduled_task", + ) + + async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: + """Get a specific `ScheduledTask` from its id. + + Args: + id: the id of the task to retrieve + + Returns: the task if available, `None` otherwise + """ + row = await self.db_pool.simple_select_one( + table="scheduled_tasks", + keyvalues={"id": id}, + retcols=( + "id", + "action", + "status", + "timestamp", + "resource_id", + "params", + "result", + "error", + ), + allow_none=True, + desc="get_scheduled_task", + ) + + return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + + async def delete_scheduled_task(self, id: str) -> None: + """Delete a specific task from its id. + + Args: + id: the id of the task to delete + """ + await self.db_pool.simple_delete( + "scheduled_tasks", + keyvalues={"id": id}, + desc="delete_scheduled_task", + ) diff --git a/synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql b/synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql new file mode 100644 index 000000000000..4ee43887b6c7 --- /dev/null +++ b/synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql @@ -0,0 +1,26 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- cf ScheduledTask docstring for the meaning of the fields. +CREATE TABLE IF NOT EXISTS scheduled_tasks( + id text PRIMARY KEY, + action text NOT NULL, + status text NOT NULL, + timestamp bigint NOT NULL, + resource_id text, + params text, + result text, + error text +); diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 095be070e0c5..7effac8c1d0f 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -15,6 +15,7 @@ import abc import re import string +from enum import Enum from typing import ( TYPE_CHECKING, AbstractSet, @@ -979,3 +980,40 @@ class UserProfile(TypedDict): class RetentionPolicy: min_lifetime: Optional[int] = None max_lifetime: Optional[int] = None + + +class TaskStatus(str, Enum): + """Status of a scheduled task""" + + # Task is scheduled but not active + SCHEDULED = "scheduled" + # Task is active and probably running, and if not + # will be run on next scheduler loop run + ACTIVE = "active" + # Task has completed successfully + COMPLETE = "complete" + # Task is over and either returned a failed status, or had an exception + FAILED = "failed" + + +@attr.s(auto_attribs=True, frozen=True, slots=True) +class ScheduledTask: + """Description of a scheduled task""" + + # id used to identify the task + id: str + # name of the action to be run by this task + action: str + # current status of this task + status: TaskStatus + # if the status is SCHEDULED then this represents when it should be launched, + # otherwise it represents the last time this task got a change of state + timestamp: int + # Optionally bind a task to some resource id for easy retrieval + resource_id: Optional[str] + # Optional parameters that will be passed to the function ran by the task + params: Optional[JsonMapping] + # Optional result that can be updated by the running task + result: Optional[JsonMapping] + # Optional error that should be assigned a value when the status is FAILED + error: Optional[str] diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py new file mode 100644 index 000000000000..2c3abaaedf99 --- /dev/null +++ b/synapse/util/task_scheduler.py @@ -0,0 +1,244 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple + +from twisted.python.failure import Failure + +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import JsonMapping, ScheduledTask, TaskStatus +from synapse.util.stringutils import random_string + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class TaskScheduler: + # Precision of the scheduler, evaluation of tasks to run will only happen + # every `SCHEDULE_INTERVAL_MS` ms + SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn + # Time before a complete or failed task is deleted from the DB + KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week + + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastores().main + self.clock = hs.get_clock() + self.running_tasks: Set[str] = set() + self.actions: Dict[ + str, + Callable[ + [ScheduledTask, bool], + Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], + ], + ] = {} + self.run_background_tasks = hs.config.worker.run_background_tasks + + if self.run_background_tasks: + self.clock.looping_call( + run_as_background_process, + TaskScheduler.SCHEDULE_INTERVAL_MS, + "scheduled_tasks_loop", + self._scheduled_tasks_loop, + ) + + def register_action( + self, + function: Callable[ + [ScheduledTask, bool], + Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], + ], + action_name: str, + ) -> None: + """Register a function to be executed when an action is scheduled with + the specified action name. + + Actions need to be registered as early as possible so that a resumed action + can find its matching function. It's usually better to NOT do that right before + calling `schedule_task` but rather in an `__init__` method. + + Args: + function: The function to be executed for this action. The parameters + passed to the function when launched are the `ScheduledTask` being run, + and a `first_launch` boolean to signal if it's a resumed task or the first + launch of it. The function should return a tuple of new `status`, `result` + and `error` as specified in `ScheduledTask`. + action_name: The name of the action to be associated with the function + """ + self.actions[action_name] = function + + async def schedule_task( + self, + action: str, + *, + resource_id: Optional[str] = None, + timestamp: Optional[int] = None, + params: Optional[JsonMapping] = None, + ) -> str: + """Schedule a new potentially resumable task. A function matching the specified + `action` should have been previously registered with `register_action`. + + Args: + action: the name of a previously registered action + resource_id: a task can be associated with a resource id to facilitate + getting all tasks associated with a specific resource + timestamp: if `None`, the task will be launched immediately, otherwise it + will be launch after the `timestamp` value. Note that this scheduler + is not meant to be precise, and the scheduling could be delayed if + too many tasks are already running + params: a set of parameters that can be easily accessed from inside the + executed function + + Returns: the id of the scheduled task + """ + if action not in self.actions: + raise Exception( + f"No function associated with the action {action} of the scheduled task" + ) + + launch_now = False + if timestamp is None or timestamp < self.clock.time_msec(): + timestamp = self.clock.time_msec() + launch_now = True + + task = ScheduledTask( + random_string(16), + action, + TaskStatus.SCHEDULED, + timestamp, + resource_id, + params, + None, + None, + ) + await self.store.upsert_scheduled_task(task) + + if launch_now and self.run_background_tasks: + await self._launch_task(task, True) + + return task.id + + async def update_task( + self, + id: str, + *, + status: Optional[TaskStatus] = None, + result: Optional[JsonMapping] = None, + error: Optional[str] = None, + ) -> None: + """Update some task associated values. + + This is used internally, and also exposed publically so it can be used inside task functions. + This allows to store in DB the progress of a task so it can be resumed properly after a restart of synapse. + + Args: + id: the id of the task to update + status: the new `TaskStatus` of the task + result: the new result of the task + error: the new error of the task + """ + await self.store.update_scheduled_task( + id, + timestamp=self.clock.time_msec(), + status=status, + result=result, + error=error, + ) + + async def get_task(self, id: str) -> Optional[ScheduledTask]: + """Get a specific task description by id. + + Args: + id: the id of the task to retrieve + + Returns: the task description or `None` if it doesn't exist + or it has already been cleaned + """ + return await self.store.get_scheduled_task(id) + + async def get_tasks( + self, action: str, resource_id: Optional[str] + ) -> List[ScheduledTask]: + """Get a list of tasks associated with an action name, and + optionally with a resource id. + + Args: + action: the action name of the tasks to retrieve + resource_id: if `None`, returns all associated tasks for + the specified action name, regardless of the resource id + + Returns: a list of `ScheduledTask` + """ + return await self.store.get_scheduled_tasks(action, resource_id) + + async def _scheduled_tasks_loop(self) -> None: + """Main loop taking care of launching the scheduled tasks when needed.""" + for task in await self.store.get_scheduled_tasks(): + if task.id not in self.running_tasks: + if ( + task.status == TaskStatus.SCHEDULED + and task.timestamp < self.clock.time_msec() + ): + await self._launch_task(task, True) + elif task.status == TaskStatus.ACTIVE: + await self._launch_task(task, False) + elif ( + task.status == TaskStatus.COMPLETE + or task.status == TaskStatus.FAILED + ) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS: + await self.store.delete_scheduled_task(task.id) + + async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: + """Launch a scheduled task now. + + Args: + task: the task to launch + first_launch: `True` if it's the first time is launched, `False` otherwise + """ + if task.action not in self.actions: + raise Exception( + f"No function associated with the action {task.action} of the scheduled task" + ) + + function = self.actions[task.action] + + async def wrapper() -> None: + try: + (status, result, error) = await function(task, first_launch) + except Exception: + f = Failure() + logger.error( + f"scheduled task {task.id} failed", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) + status = TaskStatus.FAILED + result = None + error = f.getErrorMessage() + + await self.update_task( + task.id, + status=status, + result=result, + error=error, + ) + self.running_tasks.remove(task.id) + + await self.update_task(task.id, status=TaskStatus.ACTIVE) + self.running_tasks.add(task.id) + description = task.action + if task.resource_id: + description += f"-{task.resource_id}" + run_as_background_process(description, wrapper) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py new file mode 100644 index 000000000000..9fecc6b5303f --- /dev/null +++ b/tests/util/test_task_scheduler.py @@ -0,0 +1,132 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Tuple + +from twisted.internet.task import deferLater +from twisted.test.proto_helpers import MemoryReactor + +from synapse.server import HomeServer +from synapse.types import JsonMapping, ScheduledTask, TaskStatus +from synapse.util import Clock +from synapse.util.task_scheduler import TaskScheduler + +from tests import unittest + + +class TestTaskScheduler(unittest.HomeserverTestCase): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.task_scheduler = hs.get_task_scheduler() + self.task_scheduler.register_action(self._test_task, "_test_task") + self.task_scheduler.register_action(self._raising_task, "_raising_task") + self.task_scheduler.register_action(self._resumable_task, "_resumable_task") + + async def _test_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + # This test task will copy the parameters to the result + result = None + if task.params: + result = task.params + return (TaskStatus.COMPLETE, result, None) + + def test_schedule_task(self) -> None: + """Schedule a task in the future with some parameters to be copied as a result and check it executed correctly. + Also check that it get removed after `KEEP_TASKS_FOR_MS`.""" + timestamp = self.clock.time_msec() + 2 * 60 * 1000 + task_id = self.get_success( + self.task_scheduler.schedule_task( + "_test_task", + timestamp=timestamp, + params={"val": 1}, + ) + ) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.SCHEDULED) + self.assertIsNone(task.result) + + # The timestamp being 2mn after now the task should been executed + # after the first scheduling loop is run + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.COMPLETE) + assert task.result is not None + # The passed parameter should have been copied to the result + self.assertTrue(task.result.get("val") == 1) + + # Let's wait for the complete task to be deleted and hence unavailable + self.reactor.advance((TaskScheduler.KEEP_TASKS_FOR_MS / 1000) + 1) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + self.assertIsNone(task) + + def test_schedule_task_now(self) -> None: + """Schedule a task now and check it runs fine to completion.""" + task_id = self.get_success( + self.task_scheduler.schedule_task("_test_task", params={"val": 1}) + ) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.COMPLETE) + assert task.result is not None + self.assertTrue(task.result.get("val") == 1) + + async def _raising_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + raise Exception("raising") + + def test_schedule_raising_task_now(self) -> None: + """Schedule a task raising an exception and check it runs to failure and report exception content.""" + task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task")) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.FAILED) + self.assertEqual(task.error, "raising") + + async def _resumable_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + if task.result and "in_progress" in task.result: + return TaskStatus.COMPLETE, {"success": True}, None + else: + await self.task_scheduler.update_task(task.id, result={"in_progress": True}) + # Await forever to simulate an aborted task because of a restart + await deferLater(self.reactor, 2**16, None) + # This should never been called + return TaskStatus.ACTIVE, None, None + + def test_schedule_resumable_task_now(self) -> None: + """Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart.""" + task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task")) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.ACTIVE) + + # Simulate a synapse restart by emptying the list of running tasks + self.task_scheduler.running_tasks = set() + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.COMPLETE) + assert task.result is not None + self.assertTrue(task.result.get("success")) From 0961f52c57184083d823b2b98f2b2dd1c23c8eee Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 24 Jul 2023 21:50:02 +0200 Subject: [PATCH 02/34] Add filters to task retrieval + clean less often --- .../storage/databases/main/task_scheduler.py | 76 ++++++++++++------- synapse/util/task_scheduler.py | 46 ++++++++--- 2 files changed, 83 insertions(+), 39 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 37c2110bbd30..93b7ac8fc252 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -16,7 +16,12 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus if TYPE_CHECKING: @@ -42,40 +47,56 @@ def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: return ScheduledTask(**row) async def get_scheduled_tasks( - self, action: Optional[str] = None, resource_id: Optional[str] = None + self, + actions: Optional[List[str]] = None, + resource_ids: Optional[List[str]] = None, + statuses: Optional[List[TaskStatus]] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. - If the parameters are `None` all the tasks are returned. + If an arg is `None` all tasks matching the other args will be selected. + If an arg is an empty list, the value needs to be NULL in DB to be selected. Args: - action: Limit the returned tasks to this specific action name - resource_id: Limit the returned tasks to this specific resource id + actions: Limit the returned tasks to those specific action names + resource_ids: Limit the returned tasks to thoe specific resource ids + statuses: Limit the returned tasks to thoe specific statuses Returns: a list of `ScheduledTask` """ - keyvalues = {} - if action: - keyvalues["action"] = action - if resource_id: - keyvalues["resource_id"] = resource_id - rows = await self.db_pool.simple_select_list( - table="scheduled_tasks", - keyvalues=keyvalues, - retcols=( - "id", - "action", - "status", - "timestamp", - "resource_id", - "params", - "result", - "error", - ), - desc="get_scheduled_tasks", + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + clauses = [] + args = [] + if actions is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "action", actions + ) + clauses.append(clause) + args.extend(temp_args) + if resource_ids is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "resource_id", resource_ids + ) + clauses.append(clause) + args.extend(temp_args) + if statuses is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "status", statuses + ) + clauses.append(clause) + args.extend(temp_args) + + sql = "SELECT * FROM scheduled_tasks" + if clauses: + sql = sql + " WHERE " + " AND ".join(clauses) + + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + rows = await self.db_pool.runInteraction( + "get_scheduled_tasks", get_scheduled_tasks_txn ) - return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] async def upsert_scheduled_task(self, task: ScheduledTask) -> None: @@ -107,7 +128,7 @@ async def update_scheduled_task( status: Optional[TaskStatus] = None, result: Optional[JsonMapping] = None, error: Optional[str] = None, - ) -> None: + ) -> bool: """Update a scheduled task in the DB with some new value(s). Args: @@ -126,12 +147,13 @@ async def update_scheduled_task( updatevalues["result"] = json.dumps(result) if error is not None: updatevalues["error"] = error - await self.db_pool.simple_update( + nb_rows = await self.db_pool.simple_update( "scheduled_tasks", {"id": id}, updatevalues, desc="update_scheduled_task", ) + return nb_rows > 0 async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: """Get a specific `ScheduledTask` from its id. diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 2c3abaaedf99..034a986ff255 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -31,6 +31,7 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn + CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week @@ -51,8 +52,14 @@ def __init__(self, hs: "HomeServer"): self.clock.looping_call( run_as_background_process, TaskScheduler.SCHEDULE_INTERVAL_MS, - "scheduled_tasks_loop", - self._scheduled_tasks_loop, + "run_scheduled_tasks", + self._run_scheduled_tasks, + ) + self.clock.looping_call( + run_as_background_process, + TaskScheduler.CLEAN_INTERVAL_MS, + "clean_scheduled_tasks", + self._clean_scheduled_tasks, ) def register_action( @@ -135,10 +142,11 @@ async def update_task( self, id: str, *, + timestamp: Optional[int] = None, status: Optional[TaskStatus] = None, result: Optional[JsonMapping] = None, error: Optional[str] = None, - ) -> None: + ) -> bool: """Update some task associated values. This is used internally, and also exposed publically so it can be used inside task functions. @@ -150,9 +158,11 @@ async def update_task( result: the new result of the task error: the new error of the task """ - await self.store.update_scheduled_task( + if timestamp is None: + timestamp = self.clock.time_msec() + return await self.store.update_scheduled_task( id, - timestamp=self.clock.time_msec(), + timestamp=timestamp, status=status, result=result, error=error, @@ -170,10 +180,13 @@ async def get_task(self, id: str) -> Optional[ScheduledTask]: return await self.store.get_scheduled_task(id) async def get_tasks( - self, action: str, resource_id: Optional[str] + self, + actions: Optional[List[str]] = None, + resource_ids: Optional[List[str]] = None, + statuses: Optional[List[TaskStatus]] = None, ) -> List[ScheduledTask]: - """Get a list of tasks associated with an action name, and - optionally with a resource id. + """Get a list of tasks associated with some action name(s) and/or + with some resource id(s). Args: action: the action name of the tasks to retrieve @@ -182,11 +195,13 @@ async def get_tasks( Returns: a list of `ScheduledTask` """ - return await self.store.get_scheduled_tasks(action, resource_id) + return await self.store.get_scheduled_tasks(actions, resource_ids, statuses) - async def _scheduled_tasks_loop(self) -> None: + async def _run_scheduled_tasks(self) -> None: """Main loop taking care of launching the scheduled tasks when needed.""" - for task in await self.store.get_scheduled_tasks(): + for task in await self.store.get_scheduled_tasks( + statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE] + ): if task.id not in self.running_tasks: if ( task.status == TaskStatus.SCHEDULED @@ -195,7 +210,14 @@ async def _scheduled_tasks_loop(self) -> None: await self._launch_task(task, True) elif task.status == TaskStatus.ACTIVE: await self._launch_task(task, False) - elif ( + + async def _clean_scheduled_tasks(self) -> None: + """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB.""" + for task in await self.store.get_scheduled_tasks( + statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] + ): + if task.id not in self.running_tasks: + if ( task.status == TaskStatus.COMPLETE or task.status == TaskStatus.FAILED ) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS: From e6bf9cee32e370faaff029f7a1456d667ae82f6b Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 24 Jul 2023 23:11:39 +0200 Subject: [PATCH 03/34] Move sql file --- .../delta/79/{03_scheduled_tasks.sql => 01_scheduled_tasks.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/79/{03_scheduled_tasks.sql => 01_scheduled_tasks.sql} (100%) diff --git a/synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql b/synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql similarity index 100% rename from synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql rename to synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql From abbbddfbef3af82855446438410216130abfe320 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 24 Jul 2023 23:13:53 +0200 Subject: [PATCH 04/34] Update schema version --- synapse/storage/schema/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index fc190a8b13cf..58260732fa50 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 78 # remember to update the list below when updating +SCHEMA_VERSION = 79 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the From a4ed9fea0a454b9882ecff226787c0f9ce8ded46 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 11:31:35 +0200 Subject: [PATCH 05/34] Add ts to get_tasks and narrow down the query in the loops --- .../storage/databases/main/task_scheduler.py | 5 ++++ synapse/util/task_scheduler.py | 28 +++++++++++-------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 93b7ac8fc252..75be04e996a0 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -48,9 +48,11 @@ def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: async def get_scheduled_tasks( self, + *, actions: Optional[List[str]] = None, resource_ids: Optional[List[str]] = None, statuses: Optional[List[TaskStatus]] = None, + maximum_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. @@ -86,6 +88,9 @@ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: ) clauses.append(clause) args.extend(temp_args) + if maximum_timestamp is not None: + clauses.append("timestamp <= ?") + args.append(maximum_timestamp) sql = "SELECT * FROM scheduled_tasks" if clauses: diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 034a986ff255..1d783093bee2 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -181,9 +181,11 @@ async def get_task(self, id: str) -> Optional[ScheduledTask]: async def get_tasks( self, + *, actions: Optional[List[str]] = None, resource_ids: Optional[List[str]] = None, statuses: Optional[List[TaskStatus]] = None, + maximum_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of tasks associated with some action name(s) and/or with some resource id(s). @@ -195,21 +197,23 @@ async def get_tasks( Returns: a list of `ScheduledTask` """ - return await self.store.get_scheduled_tasks(actions, resource_ids, statuses) + return await self.store.get_scheduled_tasks( + actions=actions, + resource_ids=resource_ids, + statuses=statuses, + maximum_timestamp=maximum_timestamp, + ) async def _run_scheduled_tasks(self) -> None: """Main loop taking care of launching the scheduled tasks when needed.""" + for task in await self.store.get_scheduled_tasks(statuses=[TaskStatus.ACTIVE]): + if task.id not in self.running_tasks: + await self._launch_task(task, first_launch=False) for task in await self.store.get_scheduled_tasks( - statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE] + statuses=[TaskStatus.SCHEDULED], maximum_timestamp=self.clock.time_msec() ): if task.id not in self.running_tasks: - if ( - task.status == TaskStatus.SCHEDULED - and task.timestamp < self.clock.time_msec() - ): - await self._launch_task(task, True) - elif task.status == TaskStatus.ACTIVE: - await self._launch_task(task, False) + await self._launch_task(task, first_launch=True) async def _clean_scheduled_tasks(self) -> None: """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB.""" @@ -218,9 +222,9 @@ async def _clean_scheduled_tasks(self) -> None: ): if task.id not in self.running_tasks: if ( - task.status == TaskStatus.COMPLETE - or task.status == TaskStatus.FAILED - ) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS: + self.clock.time_msec() + > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS + ): await self.store.delete_scheduled_task(task.id) async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: From 3490a919c600739534c2959b8b2de1a2101037e1 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 11:36:40 +0200 Subject: [PATCH 06/34] Renames --- synapse/storage/databases/main/task_scheduler.py | 6 +++--- synapse/util/task_scheduler.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 75be04e996a0..ac3e4f2ce6cb 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -52,7 +52,7 @@ async def get_scheduled_tasks( actions: Optional[List[str]] = None, resource_ids: Optional[List[str]] = None, statuses: Optional[List[TaskStatus]] = None, - maximum_timestamp: Optional[int] = None, + max_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. @@ -88,9 +88,9 @@ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: ) clauses.append(clause) args.extend(temp_args) - if maximum_timestamp is not None: + if max_timestamp is not None: clauses.append("timestamp <= ?") - args.append(maximum_timestamp) + args.append(max_timestamp) sql = "SELECT * FROM scheduled_tasks" if clauses: diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 1d783093bee2..73d53b1440eb 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -185,7 +185,7 @@ async def get_tasks( actions: Optional[List[str]] = None, resource_ids: Optional[List[str]] = None, statuses: Optional[List[TaskStatus]] = None, - maximum_timestamp: Optional[int] = None, + max_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of tasks associated with some action name(s) and/or with some resource id(s). @@ -201,16 +201,16 @@ async def get_tasks( actions=actions, resource_ids=resource_ids, statuses=statuses, - maximum_timestamp=maximum_timestamp, + max_timestamp=max_timestamp, ) async def _run_scheduled_tasks(self) -> None: """Main loop taking care of launching the scheduled tasks when needed.""" - for task in await self.store.get_scheduled_tasks(statuses=[TaskStatus.ACTIVE]): + for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): if task.id not in self.running_tasks: await self._launch_task(task, first_launch=False) - for task in await self.store.get_scheduled_tasks( - statuses=[TaskStatus.SCHEDULED], maximum_timestamp=self.clock.time_msec() + for task in await self.get_tasks( + statuses=[TaskStatus.SCHEDULED], max_timestamp=self.clock.time_msec() ): if task.id not in self.running_tasks: await self._launch_task(task, first_launch=True) From e03c12d23e787985ab8a4bde40f6ce4e776518cf Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 11:45:35 +0200 Subject: [PATCH 07/34] Comments --- synapse/storage/databases/main/task_scheduler.py | 15 ++++++++++----- .../schema/main/delta/79/01_scheduled_tasks.sql | 16 ++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index ac3e4f2ce6cb..1b3b72fdc11f 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -15,7 +15,7 @@ import json from typing import TYPE_CHECKING, Any, Dict, List, Optional -from synapse.storage._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -23,6 +23,7 @@ make_in_list_sql_clause, ) from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus +from synapse.util import json_encoder if TYPE_CHECKING: from synapse.server import HomeServer @@ -41,9 +42,9 @@ def __init__( def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: row["status"] = TaskStatus(row["status"]) if row["params"] is not None: - row["params"] = json.loads(row["params"]) + row["params"] = db_to_json(row["params"]) if row["result"] is not None: - row["result"] = json.loads(row["result"]) + row["result"] = db_to_json(row["result"]) return ScheduledTask(**row) async def get_scheduled_tasks( @@ -118,8 +119,12 @@ async def upsert_scheduled_task(self, task: ScheduledTask) -> None: "status": task.status, "timestamp": task.timestamp, "resource_id": task.resource_id, - "params": None if task.params is None else json.dumps(task.params), - "result": None if task.result is None else json.dumps(task.result), + "params": None + if task.params is None + else json_encoder.encode(task.params), + "result": None + if task.result is None + else json_encoder.encode(task.result), "error": task.error, }, desc="upsert_scheduled_task", diff --git a/synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql b/synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql index 4ee43887b6c7..8a588d35fa81 100644 --- a/synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql +++ b/synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql @@ -15,12 +15,12 @@ -- cf ScheduledTask docstring for the meaning of the fields. CREATE TABLE IF NOT EXISTS scheduled_tasks( - id text PRIMARY KEY, - action text NOT NULL, - status text NOT NULL, - timestamp bigint NOT NULL, - resource_id text, - params text, - result text, - error text + id TEXT PRIMARY KEY, + action TEXT NOT NULL, + status TEXT NOT NULL, + timestamp BIGINT NOT NULL, + resource_id TEXT, + params TEXT, + result TEXT, + error TEXT ); From 8e29f3d67b06a454110b3be5e85c28cdf1651370 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 11:46:45 +0200 Subject: [PATCH 08/34] Update synapse/util/task_scheduler.py Co-authored-by: Patrick Cloke --- synapse/util/task_scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 73d53b1440eb..69bcec92cc17 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -79,10 +79,10 @@ def register_action( Args: function: The function to be executed for this action. The parameters - passed to the function when launched are the `ScheduledTask` being run, - and a `first_launch` boolean to signal if it's a resumed task or the first - launch of it. The function should return a tuple of new `status`, `result` - and `error` as specified in `ScheduledTask`. + passed to the function when launched are the `ScheduledTask` being run, + and a `first_launch` boolean to signal if it's a resumed task or the first + launch of it. The function should return a tuple of new `status`, `result` + and `error` as specified in `ScheduledTask`. action_name: The name of the action to be associated with the function """ self.actions[action_name] = function From b2dff6535e89d4780ca44c836f3345946be7fd63 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 11:47:13 +0200 Subject: [PATCH 09/34] Update synapse/util/task_scheduler.py Co-authored-by: Patrick Cloke --- synapse/util/task_scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 69bcec92cc17..233aff07c679 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -195,7 +195,8 @@ async def get_tasks( resource_id: if `None`, returns all associated tasks for the specified action name, regardless of the resource id - Returns: a list of `ScheduledTask` + Returns + A list of `ScheduledTask` """ return await self.store.get_scheduled_tasks( actions=actions, From ce981c5fa1b460a7ec1a7e872eab316a0e1b9aa7 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 11:51:06 +0200 Subject: [PATCH 10/34] Add underscore to private fields --- synapse/util/task_scheduler.py | 61 ++++++++++++++++--------------- tests/util/test_task_scheduler.py | 2 +- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 233aff07c679..ed54667c113c 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -36,26 +36,26 @@ class TaskScheduler: KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week def __init__(self, hs: "HomeServer"): - self.store = hs.get_datastores().main - self.clock = hs.get_clock() - self.running_tasks: Set[str] = set() - self.actions: Dict[ + self._store = hs.get_datastores().main + self._clock = hs.get_clock() + self._running_tasks: Set[str] = set() + self._actions: Dict[ str, Callable[ [ScheduledTask, bool], Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], ], ] = {} - self.run_background_tasks = hs.config.worker.run_background_tasks + self._run_background_tasks = hs.config.worker.run_background_tasks - if self.run_background_tasks: - self.clock.looping_call( + if self._run_background_tasks: + self._clock.looping_call( run_as_background_process, TaskScheduler.SCHEDULE_INTERVAL_MS, "run_scheduled_tasks", self._run_scheduled_tasks, ) - self.clock.looping_call( + self._clock.looping_call( run_as_background_process, TaskScheduler.CLEAN_INTERVAL_MS, "clean_scheduled_tasks", @@ -85,7 +85,7 @@ def register_action( and `error` as specified in `ScheduledTask`. action_name: The name of the action to be associated with the function """ - self.actions[action_name] = function + self._actions[action_name] = function async def schedule_task( self, @@ -109,16 +109,17 @@ async def schedule_task( params: a set of parameters that can be easily accessed from inside the executed function - Returns: the id of the scheduled task + Returns: + The id of the scheduled task """ - if action not in self.actions: + if action not in self._actions: raise Exception( f"No function associated with the action {action} of the scheduled task" ) launch_now = False - if timestamp is None or timestamp < self.clock.time_msec(): - timestamp = self.clock.time_msec() + if timestamp is None or timestamp < self._clock.time_msec(): + timestamp = self._clock.time_msec() launch_now = True task = ScheduledTask( @@ -131,9 +132,9 @@ async def schedule_task( None, None, ) - await self.store.upsert_scheduled_task(task) + await self._store.upsert_scheduled_task(task) - if launch_now and self.run_background_tasks: + if launch_now and self._run_background_tasks: await self._launch_task(task, True) return task.id @@ -159,8 +160,8 @@ async def update_task( error: the new error of the task """ if timestamp is None: - timestamp = self.clock.time_msec() - return await self.store.update_scheduled_task( + timestamp = self._clock.time_msec() + return await self._store.update_scheduled_task( id, timestamp=timestamp, status=status, @@ -177,7 +178,7 @@ async def get_task(self, id: str) -> Optional[ScheduledTask]: Returns: the task description or `None` if it doesn't exist or it has already been cleaned """ - return await self.store.get_scheduled_task(id) + return await self._store.get_scheduled_task(id) async def get_tasks( self, @@ -198,7 +199,7 @@ async def get_tasks( Returns A list of `ScheduledTask` """ - return await self.store.get_scheduled_tasks( + return await self._store.get_scheduled_tasks( actions=actions, resource_ids=resource_ids, statuses=statuses, @@ -208,25 +209,25 @@ async def get_tasks( async def _run_scheduled_tasks(self) -> None: """Main loop taking care of launching the scheduled tasks when needed.""" for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): - if task.id not in self.running_tasks: + if task.id not in self._running_tasks: await self._launch_task(task, first_launch=False) for task in await self.get_tasks( - statuses=[TaskStatus.SCHEDULED], max_timestamp=self.clock.time_msec() + statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() ): - if task.id not in self.running_tasks: + if task.id not in self._running_tasks: await self._launch_task(task, first_launch=True) async def _clean_scheduled_tasks(self) -> None: """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB.""" - for task in await self.store.get_scheduled_tasks( + for task in await self._store.get_scheduled_tasks( statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] ): - if task.id not in self.running_tasks: + if task.id not in self._running_tasks: if ( - self.clock.time_msec() + self._clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS ): - await self.store.delete_scheduled_task(task.id) + await self._store.delete_scheduled_task(task.id) async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: """Launch a scheduled task now. @@ -235,12 +236,12 @@ async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: task: the task to launch first_launch: `True` if it's the first time is launched, `False` otherwise """ - if task.action not in self.actions: + if task.action not in self._actions: raise Exception( f"No function associated with the action {task.action} of the scheduled task" ) - function = self.actions[task.action] + function = self._actions[task.action] async def wrapper() -> None: try: @@ -261,10 +262,10 @@ async def wrapper() -> None: result=result, error=error, ) - self.running_tasks.remove(task.id) + self._running_tasks.remove(task.id) await self.update_task(task.id, status=TaskStatus.ACTIVE) - self.running_tasks.add(task.id) + self._running_tasks.add(task.id) description = task.action if task.resource_id: description += f"-{task.resource_id}" diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index 9fecc6b5303f..f87c0437f018 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -122,7 +122,7 @@ def test_schedule_resumable_task_now(self) -> None: self.assertEqual(task.status, TaskStatus.ACTIVE) # Simulate a synapse restart by emptying the list of running tasks - self.task_scheduler.running_tasks = set() + self.task_scheduler._running_tasks = set() self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1) task = self.get_success(self.task_scheduler.get_task(task_id)) From 9e25f5e667ebda16649f94e6849e45b475ebb738 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 12:17:01 +0200 Subject: [PATCH 11/34] lint --- synapse/util/task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index ed54667c113c..dd2016a92f02 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -109,7 +109,7 @@ async def schedule_task( params: a set of parameters that can be easily accessed from inside the executed function - Returns: + Returns: The id of the scheduled task """ if action not in self._actions: From 8c713f95faf640a24a4101ea1d95f64790784ac6 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 12:19:17 +0200 Subject: [PATCH 12/34] Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/util/task_scheduler.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index dd2016a92f02..fba3f0f8b86e 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -175,8 +175,9 @@ async def get_task(self, id: str) -> Optional[ScheduledTask]: Args: id: the id of the task to retrieve - Returns: the task description or `None` if it doesn't exist - or it has already been cleaned + Returns: + The task description or `None` if it doesn't exis or it has + already been cleaned """ return await self._store.get_scheduled_task(id) @@ -266,7 +267,5 @@ async def wrapper() -> None: await self.update_task(task.id, status=TaskStatus.ACTIVE) self._running_tasks.add(task.id) - description = task.action - if task.resource_id: - description += f"-{task.resource_id}" + description = f"{task.id}-{task.action}" run_as_background_process(description, wrapper) From bc92b7274825f9730666f4cdb0c80a786cee61fd Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 12:20:40 +0200 Subject: [PATCH 13/34] Comments --- synapse/util/task_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index fba3f0f8b86e..2582d714d9d9 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -176,8 +176,8 @@ async def get_task(self, id: str) -> Optional[ScheduledTask]: id: the id of the task to retrieve Returns: - The task description or `None` if it doesn't exis or it has - already been cleaned + The task information or `None` if it doesn't exist or it has + already been removed because it's too old. """ return await self._store.get_scheduled_task(id) From 46cbde8edb2bcbd6dfb92fc3bbe6467797cd96e0 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 12:34:22 +0200 Subject: [PATCH 14/34] Fix error handling --- synapse/util/task_scheduler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 2582d714d9d9..e4965543ddce 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -114,7 +114,7 @@ async def schedule_task( """ if action not in self._actions: raise Exception( - f"No function associated with the action {action} of the scheduled task" + f"No function associated with action {action} of the scheduled task" ) launch_now = False @@ -238,9 +238,10 @@ async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: first_launch: `True` if it's the first time is launched, `False` otherwise """ if task.action not in self._actions: - raise Exception( - f"No function associated with the action {task.action} of the scheduled task" + logger.warn( + f"Can't launch task {task.id} since no function associated with action {action}" ) + return function = self._actions[task.action] From 1025432ca48acbb0532e6eddfb415ffd2db790d7 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 26 Jul 2023 12:42:33 +0200 Subject: [PATCH 15/34] More comments --- synapse/types/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index cfde81f41ba9..3dc9d1b11f37 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -988,14 +988,15 @@ class TaskStatus(str, Enum): class ScheduledTask: """Description of a scheduled task""" - # id used to identify the task + # Id used to identify the task id: str - # name of the action to be run by this task + # Name of the action to be run by this task action: str - # current status of this task + # Current status of this task status: TaskStatus - # if the status is SCHEDULED then this represents when it should be launched, - # otherwise it represents the last time this task got a change of state + # If the status is SCHEDULED then this represents when it should be launched, + # otherwise it represents the last time this task got a change of state. + # In milliseconds since epoch in system time timezone, usually UTC. timestamp: int # Optionally bind a task to some resource id for easy retrieval resource_id: Optional[str] From fd2c3dda347a12d423b482ca19d036461e9d1bac Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 27 Jul 2023 16:16:15 +0200 Subject: [PATCH 16/34] Fix --- synapse/util/task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index e4965543ddce..204cf1f26377 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -239,7 +239,7 @@ async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: """ if task.action not in self._actions: logger.warn( - f"Can't launch task {task.id} since no function associated with action {action}" + f"Can't launch task {task.id} since no function associated with action {task.action}" ) return From 9e9a5e896f173e93e13aed93ae852494ed88fb70 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 28 Jul 2023 17:08:10 +0200 Subject: [PATCH 17/34] Add doc --- synapse/storage/databases/main/task_scheduler.py | 6 ++++-- synapse/util/task_scheduler.py | 15 ++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 1b3b72fdc11f..cf959fda0ee2 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -62,8 +62,10 @@ async def get_scheduled_tasks( Args: actions: Limit the returned tasks to those specific action names - resource_ids: Limit the returned tasks to thoe specific resource ids - statuses: Limit the returned tasks to thoe specific statuses + resource_ids: Limit the returned tasks to the specific resource ids + statuses: Limit the returned tasks to the specific statuses + max_timestamp: Limit the returned tasks to the ones that have + a timestamp inferior to the specified one Returns: a list of `ScheduledTask` """ diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 204cf1f26377..ebf0a5423f05 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -189,13 +189,18 @@ async def get_tasks( statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: - """Get a list of tasks associated with some action name(s) and/or - with some resource id(s). + """Get a list of tasks. Returns all the tasks if no args is provided. + + If an arg is `None` all tasks matching the other args will be selected. + If an arg is an empty list, the corresponding value of the task needs + to be `None` to be selected. Args: - action: the action name of the tasks to retrieve - resource_id: if `None`, returns all associated tasks for - the specified action name, regardless of the resource id + actions: Limit the returned tasks to those specific action names + resource_ids: Limit the returned tasks to the specific resource ids + statuses: Limit the returned tasks to the specific statuses + max_timestamp: Limit the returned tasks to the ones that have + a timestamp inferior to the specified one Returns A list of `ScheduledTask` From 4c1c833f443d92e937d23de235c682daa58f8141 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 28 Jul 2023 17:11:17 +0200 Subject: [PATCH 18/34] Move to new DB delta folder --- synapse/storage/schema/__init__.py | 5 ++++- .../schema/main/delta/{79 => 80}/01_scheduled_tasks.sql | 0 2 files changed, 4 insertions(+), 1 deletion(-) rename synapse/storage/schema/main/delta/{79 => 80}/01_scheduled_tasks.sql (100%) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index d3ec648f6d0f..76c9e36965f5 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 79 # remember to update the list below when updating +SCHEMA_VERSION = 80 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -110,6 +110,9 @@ Changes in SCHEMA_VERSION = 79 - Add tables to handle in DB read-write locks. - Add some mitigations for a painful race between foreground and background updates, cf #15677. + +Changes in SCHEMA_VERSION = 80 + - Add tables for the task scheduler. """ diff --git a/synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql b/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql similarity index 100% rename from synapse/storage/schema/main/delta/79/01_scheduled_tasks.sql rename to synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql From 6762c07e87e72e876bd02192891db2b077e128b1 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Sat, 29 Jul 2023 00:13:45 +0200 Subject: [PATCH 19/34] Limit concurrent running tasks to 20 --- .../storage/databases/main/task_scheduler.py | 4 +- synapse/util/task_scheduler.py | 18 ++++-- tests/util/test_task_scheduler.py | 62 +++++++++++++++++++ 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index cf959fda0ee2..8101d01a6ea6 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -67,7 +67,7 @@ async def get_scheduled_tasks( max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one - Returns: a list of `ScheduledTask` + Returns: a list of `ScheduledTask`, ordered by increasing timestamps """ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: @@ -99,6 +99,8 @@ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: if clauses: sql = sql + " WHERE " + " AND ".join(clauses) + sql = sql + "ORDER BY timestamp" + txn.execute(sql, args) return self.db_pool.cursor_to_dict(txn) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index ebf0a5423f05..1c6f2f25d223 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -34,6 +34,8 @@ class TaskScheduler: CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week + # Maximum number of tasks that can run at the same time + MAX_CONCURRENT_RUNNING_TASKS = 2 def __init__(self, hs: "HomeServer"): self._store = hs.get_datastores().main @@ -203,7 +205,7 @@ async def get_tasks( a timestamp inferior to the specified one Returns - A list of `ScheduledTask` + A list of `ScheduledTask`, ordered by increasing timestamps """ return await self._store.get_scheduled_tasks( actions=actions, @@ -215,12 +217,20 @@ async def get_tasks( async def _run_scheduled_tasks(self) -> None: """Main loop taking care of launching the scheduled tasks when needed.""" for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): - if task.id not in self._running_tasks: + if ( + task.id not in self._running_tasks + and len(self._running_tasks) + < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + ): await self._launch_task(task, first_launch=False) for task in await self.get_tasks( statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() ): - if task.id not in self._running_tasks: + if ( + task.id not in self._running_tasks + and len(self._running_tasks) + < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + ): await self._launch_task(task, first_launch=True) async def _clean_scheduled_tasks(self) -> None: @@ -271,7 +281,7 @@ async def wrapper() -> None: ) self._running_tasks.remove(task.id) - await self.update_task(task.id, status=TaskStatus.ACTIVE) self._running_tasks.add(task.id) + await self.update_task(task.id, status=TaskStatus.ACTIVE) description = f"{task.id}-{task.action}" run_as_background_process(description, wrapper) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index f87c0437f018..a528cf3329ac 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -29,6 +29,7 @@ class TestTaskScheduler(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.task_scheduler = hs.get_task_scheduler() self.task_scheduler.register_action(self._test_task, "_test_task") + self.task_scheduler.register_action(self._sleeping_task, "_sleeping_task") self.task_scheduler.register_action(self._raising_task, "_raising_task") self.task_scheduler.register_action(self._resumable_task, "_resumable_task") @@ -75,6 +76,67 @@ def test_schedule_task(self) -> None: task = self.get_success(self.task_scheduler.get_task(task_id)) self.assertIsNone(task) + async def _sleeping_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + # Sleep for a second + await deferLater(self.reactor, 1, None) + return TaskStatus.COMPLETE, None, None + + def test_schedule_lot_of_tasks(self) -> None: + """Schedule more than `TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS` tasks and check the behavior.""" + timestamp = self.clock.time_msec() + 2 * 60 * 1000 + task_ids = [] + for i in range(TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + 1): + task_ids.append( + self.get_success( + self.task_scheduler.schedule_task( + "_sleeping_task", + timestamp=timestamp, + params={"val": i}, + ) + ) + ) + + # The timestamp being 2mn after now the task should been executed + # after the first scheduling loop is run + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + + # This is to give the time to the sleeping tasks to finish + self.reactor.advance(1) + + # Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one + # is still scheduled. + tasks = [ + self.get_success(self.task_scheduler.get_task(task_id)) + for task_id in task_ids + ] + + self.assertEquals( + len( + [t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE] + ), + TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS, + ) + + scheduled_tasks = [ + t for t in tasks if t is not None and t.status == TaskStatus.SCHEDULED + ] + self.assertEquals(len(scheduled_tasks), 1) + + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + self.reactor.advance(1) + + # Check that the last task has been properly executed after the next scheduler loop run + prev_scheduled_task = self.get_success( + self.task_scheduler.get_task(scheduled_tasks[0].id) + ) + assert prev_scheduled_task is not None + self.assertEquals( + prev_scheduled_task.status, + TaskStatus.COMPLETE, + ) + def test_schedule_task_now(self) -> None: """Schedule a task now and check it runs fine to completion.""" task_id = self.get_success( From dfa39838284c7e75a5ef4b853469cf41860938d8 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Sat, 29 Jul 2023 00:52:28 +0200 Subject: [PATCH 20/34] Remove launch now feature, max 10 --- synapse/util/task_scheduler.py | 15 +++++---------- tests/util/test_task_scheduler.py | 2 +- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 1c6f2f25d223..67b8b32df57e 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -35,7 +35,7 @@ class TaskScheduler: # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time - MAX_CONCURRENT_RUNNING_TASKS = 2 + MAX_CONCURRENT_RUNNING_TASKS = 10 def __init__(self, hs: "HomeServer"): self._store = hs.get_datastores().main @@ -104,10 +104,10 @@ async def schedule_task( action: the name of a previously registered action resource_id: a task can be associated with a resource id to facilitate getting all tasks associated with a specific resource - timestamp: if `None`, the task will be launched immediately, otherwise it - will be launch after the `timestamp` value. Note that this scheduler - is not meant to be precise, and the scheduling could be delayed if - too many tasks are already running + timestamp: if `None`, the task will be launched as soon as possible, otherwise it + will be launch as soon as possible after the `timestamp` value. + Note that this scheduler is not meant to be precise, and the scheduling + could be delayed if too many tasks are already running params: a set of parameters that can be easily accessed from inside the executed function @@ -119,10 +119,8 @@ async def schedule_task( f"No function associated with action {action} of the scheduled task" ) - launch_now = False if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() - launch_now = True task = ScheduledTask( random_string(16), @@ -136,9 +134,6 @@ async def schedule_task( ) await self._store.upsert_scheduled_task(task) - if launch_now and self._run_background_tasks: - await self._launch_task(task, True) - return task.id async def update_task( diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index a528cf3329ac..49b7820238f3 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -185,7 +185,7 @@ def test_schedule_resumable_task_now(self) -> None: # Simulate a synapse restart by emptying the list of running tasks self.task_scheduler._running_tasks = set() - self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1) + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None From 7730d74d28f92b069d57082d3e905338205165aa Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Sat, 29 Jul 2023 00:57:17 +0200 Subject: [PATCH 21/34] Fix tests --- tests/util/test_task_scheduler.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index 49b7820238f3..e8526d74a552 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -137,27 +137,17 @@ def test_schedule_lot_of_tasks(self) -> None: TaskStatus.COMPLETE, ) - def test_schedule_task_now(self) -> None: - """Schedule a task now and check it runs fine to completion.""" - task_id = self.get_success( - self.task_scheduler.schedule_task("_test_task", params={"val": 1}) - ) - - task = self.get_success(self.task_scheduler.get_task(task_id)) - assert task is not None - self.assertEqual(task.status, TaskStatus.COMPLETE) - assert task.result is not None - self.assertTrue(task.result.get("val") == 1) - async def _raising_task( self, task: ScheduledTask, first_launch: bool ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: raise Exception("raising") - def test_schedule_raising_task_now(self) -> None: + def test_schedule_raising_task(self) -> None: """Schedule a task raising an exception and check it runs to failure and report exception content.""" task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task")) + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None self.assertEqual(task.status, TaskStatus.FAILED) @@ -175,10 +165,12 @@ async def _resumable_task( # This should never been called return TaskStatus.ACTIVE, None, None - def test_schedule_resumable_task_now(self) -> None: + def test_schedule_resumable_task(self) -> None: """Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart.""" task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task")) + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None self.assertEqual(task.status, TaskStatus.ACTIVE) From 5878384c83d444a02597e0b56aa89dd28526d84c Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Sat, 29 Jul 2023 22:27:10 +0200 Subject: [PATCH 22/34] Fix olddeps tests --- tests/util/test_task_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index e8526d74a552..9ab1e3eed9b3 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -80,7 +80,7 @@ async def _sleeping_task( self, task: ScheduledTask, first_launch: bool ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: # Sleep for a second - await deferLater(self.reactor, 1, None) + await deferLater(self.reactor, 1, lambda: None) return TaskStatus.COMPLETE, None, None def test_schedule_lot_of_tasks(self) -> None: @@ -161,7 +161,7 @@ async def _resumable_task( else: await self.task_scheduler.update_task(task.id, result={"in_progress": True}) # Await forever to simulate an aborted task because of a restart - await deferLater(self.reactor, 2**16, None) + await deferLater(self.reactor, 2**16, lambda: None) # This should never been called return TaskStatus.ACTIVE, None, None From 2383d1ee2504268ac37778ef3836797d2d0d087c Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 3 Aug 2023 15:36:31 +0200 Subject: [PATCH 23/34] Merge loops --- synapse/util/task_scheduler.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 67b8b32df57e..b3045c235e41 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -31,7 +31,6 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn - CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time @@ -41,6 +40,7 @@ def __init__(self, hs: "HomeServer"): self._store = hs.get_datastores().main self._clock = hs.get_clock() self._running_tasks: Set[str] = set() + # A map between action names and their registered function self._actions: Dict[ str, Callable[ @@ -54,14 +54,8 @@ def __init__(self, hs: "HomeServer"): self._clock.looping_call( run_as_background_process, TaskScheduler.SCHEDULE_INTERVAL_MS, - "run_scheduled_tasks", - self._run_scheduled_tasks, - ) - self._clock.looping_call( - run_as_background_process, - TaskScheduler.CLEAN_INTERVAL_MS, - "clean_scheduled_tasks", - self._clean_scheduled_tasks, + "handle_scheduled_tasks", + self._handle_scheduled_tasks, ) def register_action( @@ -209,8 +203,13 @@ async def get_tasks( max_timestamp=max_timestamp, ) - async def _run_scheduled_tasks(self) -> None: - """Main loop taking care of launching the scheduled tasks when needed.""" + async def _handle_scheduled_tasks(self) -> None: + """Main loop taking care of launching tasks and cleaning up old ones.""" + await self._launch_scheduled_tasks() + await self._clean_scheduled_tasks() + + async def _launch_scheduled_tasks(self) -> None: + """Retrieve and launch scheduled tasks that should be running at that time.""" for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): if ( task.id not in self._running_tasks @@ -229,7 +228,7 @@ async def _run_scheduled_tasks(self) -> None: await self._launch_task(task, first_launch=True) async def _clean_scheduled_tasks(self) -> None: - """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB.""" + """Clean old complete or failed jobs to avoid clutter the DB.""" for task in await self._store.get_scheduled_tasks( statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] ): From 16be76bd6aac1ffcdd0010c11fe8e3edc927df92 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 10 Aug 2023 13:47:25 +0200 Subject: [PATCH 24/34] Address comments --- .../storage/databases/main/task_scheduler.py | 18 ++++----- synapse/util/task_scheduler.py | 37 +++++++++++++------ 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 8101d01a6ea6..41c8e8087360 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -109,16 +109,16 @@ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: ) return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] - async def upsert_scheduled_task(self, task: ScheduledTask) -> None: - """Upsert a specified `ScheduledTask` in the DB. + async def insert_scheduled_task(self, task: ScheduledTask) -> None: + """Insert a specified `ScheduledTask` in the DB. Args: - task: the `ScheduledTask` to upsert + task: the `ScheduledTask` to insert """ - await self.db_pool.simple_upsert( + await self.db_pool.simple_insert( "scheduled_tasks", - {"id": task.id}, { + "id": task.id, "action": task.action, "status": task.status, "timestamp": task.timestamp, @@ -131,14 +131,14 @@ async def upsert_scheduled_task(self, task: ScheduledTask) -> None: else json_encoder.encode(task.result), "error": task.error, }, - desc="upsert_scheduled_task", + desc="insert_scheduled_task", ) async def update_scheduled_task( self, id: str, + timestamp: int, *, - timestamp: Optional[int] = None, status: Optional[TaskStatus] = None, result: Optional[JsonMapping] = None, error: Optional[str] = None, @@ -152,9 +152,7 @@ async def update_scheduled_task( result: new result of the task error: new error of the task """ - updatevalues: JsonDict = {} - if timestamp is not None: - updatevalues["timestamp"] = timestamp + updatevalues: JsonDict = {"timestamp": timestamp} if status is not None: updatevalues["status"] = status if result is not None: diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index b3045c235e41..eb4785ee655e 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -126,7 +126,7 @@ async def schedule_task( None, None, ) - await self._store.upsert_scheduled_task(task) + await self._store.insert_scheduled_task(task) return task.id @@ -139,22 +139,33 @@ async def update_task( result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. + """Update some task associated values. This is exposed publically so it can + be used inside task functions, mainly to update the result and be able to + resume a task at a specific step after a restart of synapse. - This is used internally, and also exposed publically so it can be used inside task functions. - This allows to store in DB the progress of a task so it can be resumed properly after a restart of synapse. + It can also be used to stage a task, by setting the `status` to `SCHEDULED` with + a new timestamp. + + The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED` + are terminal status and can only be set by returning it in the function. Args: id: the id of the task to update + timestamp: useful to schedule a new stage of the task at a later date status: the new `TaskStatus` of the task result: the new result of the task error: the new error of the task """ + if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED: + raise Exception( + "update_task can't be called with a FAILED or COMPLETE status" + ) + if timestamp is None: timestamp = self._clock.time_msec() return await self._store.update_scheduled_task( id, - timestamp=timestamp, + timestamp, status=status, result=result, error=error, @@ -232,12 +243,13 @@ async def _clean_scheduled_tasks(self) -> None: for task in await self._store.get_scheduled_tasks( statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] ): - if task.id not in self._running_tasks: - if ( - self._clock.time_msec() - > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS - ): - await self._store.delete_scheduled_task(task.id) + # FAILED and COMPLETE tasks should never be running + assert task.id not in self._running_tasks + if ( + self._clock.time_msec() + > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS + ): + await self._store.delete_scheduled_task(task.id) async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: """Launch a scheduled task now. @@ -267,8 +279,9 @@ async def wrapper() -> None: result = None error = f.getErrorMessage() - await self.update_task( + await self._store.update_scheduled_task( task.id, + self._clock.time_msec(), status=status, result=result, error=error, From ad2523760d4c3c40886f486680d8fe0f11d08e52 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 10 Aug 2023 13:51:58 +0200 Subject: [PATCH 25/34] Address comments --- synapse/storage/databases/main/task_scheduler.py | 2 ++ synapse/util/task_scheduler.py | 6 +----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 41c8e8087360..d53f9702ff6d 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -151,6 +151,8 @@ async def update_scheduled_task( status: new status of the task result: new result of the task error: new error of the task + + Returns: `False` if no matching row was found, `True` otherwise """ updatevalues: JsonDict = {"timestamp": timestamp} if status is not None: diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index eb4785ee655e..c6c13b68b17a 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -258,11 +258,7 @@ async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: task: the task to launch first_launch: `True` if it's the first time is launched, `False` otherwise """ - if task.action not in self._actions: - logger.warn( - f"Can't launch task {task.id} since no function associated with action {task.action}" - ) - return + assert task.action in self._actions function = self._actions[task.action] From 7b963798eb3c49cd8500cd4c2c32e5e436889bbe Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 11 Aug 2023 12:16:03 +0200 Subject: [PATCH 26/34] Add delete_task --- synapse/util/task_scheduler.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index c6c13b68b17a..7ad00a19fefa 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -214,6 +214,25 @@ async def get_tasks( max_timestamp=max_timestamp, ) + async def delete_task(self, id: str) -> None: + """Delete a task. Running tasks can't be deleted. + + Args: + id: id of the task to delete + """ + + if self.task_is_running(id): + raise Exception(f"Task {id} is currently running and can't be deleted") + await self._store.delete_scheduled_task(id) + + def task_is_running(self, id: str) -> bool: + """Check if a task is currently running. + + Args: + id: id of the task to check + """ + return id in self._running_tasks + async def _handle_scheduled_tasks(self) -> None: """Main loop taking care of launching tasks and cleaning up old ones.""" await self._launch_scheduled_tasks() @@ -223,7 +242,7 @@ async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): if ( - task.id not in self._running_tasks + not self.task_is_running(task.id) and len(self._running_tasks) < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS ): @@ -232,7 +251,7 @@ async def _launch_scheduled_tasks(self) -> None: statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() ): if ( - task.id not in self._running_tasks + not self.task_is_running(task.id) and len(self._running_tasks) < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS ): @@ -244,7 +263,7 @@ async def _clean_scheduled_tasks(self) -> None: statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] ): # FAILED and COMPLETE tasks should never be running - assert task.id not in self._running_tasks + assert not self.task_is_running(task.id) if ( self._clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS From 3fd518cabad7a2db67c08564c8efbe0aa271aac8 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 11 Aug 2023 12:29:13 +0200 Subject: [PATCH 27/34] Add index on status --- synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql b/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql index 8a588d35fa81..286d109ed7e2 100644 --- a/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql +++ b/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql @@ -24,3 +24,5 @@ CREATE TABLE IF NOT EXISTS scheduled_tasks( result TEXT, error TEXT ); + +CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status); From 1a410ee149736b6b7a8a33d4c3909ef9bc714c79 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 14 Aug 2023 12:17:49 +0200 Subject: [PATCH 28/34] Add big docstring --- synapse/util/task_scheduler.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 7ad00a19fefa..58b92003da43 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -28,6 +28,37 @@ class TaskScheduler: + """ + This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` + to launch a background task, or Twisted `deferLater` if we want to do so later on. + + The problem with that is that the tasks will just stop and never be resumed if synapse + is stopped for whatever reason. + + How this works: + - A function mapped to a named action should first be registered with `register_action`. + This function will be called when trying to resuming tasks after a synapse shutdown, + so this registration should happen when synapse is initialised, NOT right before scheduling + a task. + - A task can then be launched using this named action with `schedule_task`. A `params` dict + can be passed, and it will be available to the registered function when launched. This task + can be launch either now-ish, or later on by giving a `timestamp` parameter. + + The function may call `update_task` at any time to update the `result` of the task, + and this can be used to resume the task at a specific point and/or to convey a result to + the code launching the task. + You can also specify the `result` (and/or an `error`) when returning from the function. + + The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting + to launch now, the launch will still not happen before the next loop run. + + Tasks will be run on the worker specified with `run_background_tasks_on` config, + or the main one by default. + There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already + full. In this regard, please take great care that scheduled tasks can actually finished. + For now there is no mechanism to stop a running task if it is stuck. + """ + # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn From 8ed3e372b154a3d8e78939c40efeaef7e7b73999 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 14 Aug 2023 12:42:08 +0200 Subject: [PATCH 29/34] Add running tasks metric and log running tasks for more than 24hrs with no updates --- synapse/util/task_scheduler.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 58b92003da43..15a74707c4bb 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -15,6 +15,8 @@ import logging from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple +from prometheus_client import Gauge + from twisted.python.failure import Failure from synapse.metrics.background_process_metrics import run_as_background_process @@ -27,6 +29,12 @@ logger = logging.getLogger(__name__) +running_tasks_gauge = Gauge( + "synapse_scheduler_running_tasks", + "The number of concurrent running tasks handled by the TaskScheduler", +) + + class TaskScheduler: """ This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` @@ -61,11 +69,13 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms - SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn + SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mns # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time MAX_CONCURRENT_RUNNING_TASKS = 10 + # Time from the last task update after which we will log a warning + LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs def __init__(self, hs: "HomeServer"): self._store = hs.get_datastores().main @@ -272,12 +282,20 @@ async def _handle_scheduled_tasks(self) -> None: async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): - if ( - not self.task_is_running(task.id) - and len(self._running_tasks) - < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS - ): - await self._launch_task(task, first_launch=False) + if not self.task_is_running(task.id): + if ( + len(self._running_tasks) + < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + ): + await self._launch_task(task, first_launch=False) + else: + if ( + self._clock.time_msec() + > task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS + ): + logger.warn( + f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck" + ) for task in await self.get_tasks( statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() ): @@ -288,6 +306,8 @@ async def _launch_scheduled_tasks(self) -> None: ): await self._launch_task(task, first_launch=True) + running_tasks_gauge.set(len(self._running_tasks)) + async def _clean_scheduled_tasks(self) -> None: """Clean old complete or failed jobs to avoid clutter the DB.""" for task in await self._store.get_scheduled_tasks( From c089f679c35f7a07777375e3c3b3ffbac68e41ee Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 16 Aug 2023 16:18:21 +0200 Subject: [PATCH 30/34] Remove unused sql file --- .../main/delta/80/01_scheduled_tasks.sql | 28 ------------------- 1 file changed, 28 deletions(-) delete mode 100644 synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql diff --git a/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql b/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql deleted file mode 100644 index 286d109ed7e2..000000000000 --- a/synapse/storage/schema/main/delta/80/01_scheduled_tasks.sql +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright 2023 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- cf ScheduledTask docstring for the meaning of the fields. -CREATE TABLE IF NOT EXISTS scheduled_tasks( - id TEXT PRIMARY KEY, - action TEXT NOT NULL, - status TEXT NOT NULL, - timestamp BIGINT NOT NULL, - resource_id TEXT, - params TEXT, - result TEXT, - error TEXT -); - -CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status); From d158b673fc8417da4bb8172966518d818bf4cf54 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 16 Aug 2023 17:01:43 +0200 Subject: [PATCH 31/34] Address reviews --- .../storage/databases/main/task_scheduler.py | 27 +++++++------------ synapse/util/task_scheduler.py | 12 ++++----- tests/util/test_task_scheduler.py | 10 +++---- 3 files changed, 21 insertions(+), 28 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index d53f9702ff6d..283690aa2a98 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json from typing import TYPE_CHECKING, Any, Dict, List, Optional from synapse.storage._base import SQLBaseStore, db_to_json @@ -51,18 +50,15 @@ async def get_scheduled_tasks( self, *, actions: Optional[List[str]] = None, - resource_ids: Optional[List[str]] = None, + resource_id: Optional[str] = None, statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. - If an arg is `None` all tasks matching the other args will be selected. - If an arg is an empty list, the value needs to be NULL in DB to be selected. - Args: actions: Limit the returned tasks to those specific action names - resource_ids: Limit the returned tasks to the specific resource ids + resource_id: Limit the returned tasks to the specific resource id, if specified statuses: Limit the returned tasks to the specific statuses max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one @@ -71,21 +67,18 @@ async def get_scheduled_tasks( """ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: - clauses = [] - args = [] - if actions is not None: + clauses: List[str] = [] + args: List[Any] = [] + if resource_id: + clauses.append("resource_id = ?") + args.append(resource_id) + if actions: clause, temp_args = make_in_list_sql_clause( txn.database_engine, "action", actions ) clauses.append(clause) args.extend(temp_args) - if resource_ids is not None: - clause, temp_args = make_in_list_sql_clause( - txn.database_engine, "resource_id", resource_ids - ) - clauses.append(clause) - args.extend(temp_args) - if statuses is not None: + if statuses: clause, temp_args = make_in_list_sql_clause( txn.database_engine, "status", statuses ) @@ -158,7 +151,7 @@ async def update_scheduled_task( if status is not None: updatevalues["status"] = status if result is not None: - updatevalues["result"] = json.dumps(result) + updatevalues["result"] = json_encoder.encode(result) if error is not None: updatevalues["error"] = error nb_rows = await self.db_pool.simple_update( diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 15a74707c4bb..e44966f397ae 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -69,7 +69,7 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms - SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mns + SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time @@ -164,8 +164,8 @@ async def schedule_task( timestamp, resource_id, params, - None, - None, + result=None, + error=None, ) await self._store.insert_scheduled_task(task) @@ -228,7 +228,7 @@ async def get_tasks( self, *, actions: Optional[List[str]] = None, - resource_ids: Optional[List[str]] = None, + resource_id: Optional[str] = None, statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, ) -> List[ScheduledTask]: @@ -240,7 +240,7 @@ async def get_tasks( Args: actions: Limit the returned tasks to those specific action names - resource_ids: Limit the returned tasks to the specific resource ids + resource_id: Limit the returned tasks to the specific resource id, if specified statuses: Limit the returned tasks to the specific statuses max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one @@ -250,7 +250,7 @@ async def get_tasks( """ return await self._store.get_scheduled_tasks( actions=actions, - resource_ids=resource_ids, + resource_id=resource_id, statuses=statuses, max_timestamp=max_timestamp, ) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index 9ab1e3eed9b3..3a97559bf04b 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -45,7 +45,7 @@ async def _test_task( def test_schedule_task(self) -> None: """Schedule a task in the future with some parameters to be copied as a result and check it executed correctly. Also check that it get removed after `KEEP_TASKS_FOR_MS`.""" - timestamp = self.clock.time_msec() + 2 * 60 * 1000 + timestamp = self.clock.time_msec() + 30 * 1000 task_id = self.get_success( self.task_scheduler.schedule_task( "_test_task", @@ -59,9 +59,9 @@ def test_schedule_task(self) -> None: self.assertEqual(task.status, TaskStatus.SCHEDULED) self.assertIsNone(task.result) - # The timestamp being 2mn after now the task should been executed + # The timestamp being 30s after now the task should been executed # after the first scheduling loop is run - self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None @@ -85,7 +85,7 @@ async def _sleeping_task( def test_schedule_lot_of_tasks(self) -> None: """Schedule more than `TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS` tasks and check the behavior.""" - timestamp = self.clock.time_msec() + 2 * 60 * 1000 + timestamp = self.clock.time_msec() + 30 * 1000 task_ids = [] for i in range(TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + 1): task_ids.append( @@ -98,7 +98,7 @@ def test_schedule_lot_of_tasks(self) -> None: ) ) - # The timestamp being 2mn after now the task should been executed + # The timestamp being 30s after now the task should been executed # after the first scheduling loop is run self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) From 223f87e3eb1b0dbe6194b1674b1bb9c7f36e38a8 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 16 Aug 2023 17:10:35 +0200 Subject: [PATCH 32/34] Address comment --- synapse/util/task_scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index e44966f397ae..973efb9fcadc 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -258,10 +258,12 @@ async def get_tasks( async def delete_task(self, id: str) -> None: """Delete a task. Running tasks can't be deleted. + Can only be called from the worker handling the task scheduling. + Args: id: id of the task to delete """ - + assert self._run_background_tasks if self.task_is_running(id): raise Exception(f"Task {id} is currently running and can't be deleted") await self._store.delete_scheduled_task(id) From e5a534483f66516b55e271e1c7bdf3d81beb60a9 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 16 Aug 2023 17:11:59 +0200 Subject: [PATCH 33/34] Fix --- synapse/util/task_scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 973efb9fcadc..773a8327f633 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -263,7 +263,6 @@ async def delete_task(self, id: str) -> None: Args: id: id of the task to delete """ - assert self._run_background_tasks if self.task_is_running(id): raise Exception(f"Task {id} is currently running and can't be deleted") await self._store.delete_scheduled_task(id) @@ -271,9 +270,12 @@ async def delete_task(self, id: str) -> None: def task_is_running(self, id: str) -> bool: """Check if a task is currently running. + Can only be called from the worker handling the task scheduling. + Args: id: id of the task to check """ + assert self._run_background_tasks return id in self._running_tasks async def _handle_scheduled_tasks(self) -> None: From 56ab35e2be0c9a3cc18a39f5ed446ed5ce0992ce Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 21 Aug 2023 12:05:14 +0200 Subject: [PATCH 34/34] Address comment --- synapse/storage/databases/main/task_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 283690aa2a98..1fb3180c3c96 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -72,13 +72,13 @@ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: if resource_id: clauses.append("resource_id = ?") args.append(resource_id) - if actions: + if actions is not None: clause, temp_args = make_in_list_sql_clause( txn.database_engine, "action", actions ) clauses.append(clause) args.extend(temp_args) - if statuses: + if statuses is not None: clause, temp_args = make_in_list_sql_clause( txn.database_engine, "status", statuses )