Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implements a task scheduler for resumable potentially long running tasks #15891

Merged
merged 38 commits into from Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
470f385
Implements a task scheduler for resumable potentially long running tasks
Jul 7, 2023
0961f52
Add filters to task retrieval + clean less often
Jul 24, 2023
862425f
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
MatMaul Jul 24, 2023
e6bf9ce
Move sql file
MatMaul Jul 24, 2023
abbbddf
Update schema version
MatMaul Jul 24, 2023
a4ed9fe
Add ts to get_tasks and narrow down the query in the loops
MatMaul Jul 26, 2023
3490a91
Renames
Jul 26, 2023
e03c12d
Comments
Jul 26, 2023
8e29f3d
Update synapse/util/task_scheduler.py
Jul 26, 2023
b2dff65
Update synapse/util/task_scheduler.py
Jul 26, 2023
ce981c5
Add underscore to private fields
Jul 26, 2023
9e25f5e
lint
Jul 26, 2023
8c713f9
Apply suggestions from code review
Jul 26, 2023
bc92b72
Comments
Jul 26, 2023
46cbde8
Fix error handling
Jul 26, 2023
1025432
More comments
Jul 26, 2023
fd2c3dd
Fix
Jul 27, 2023
9e9a5e8
Add doc
Jul 28, 2023
861992f
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
Jul 28, 2023
4c1c833
Move to new DB delta folder
Jul 28, 2023
6762c07
Limit concurrent running tasks to 20
Jul 28, 2023
dfa3983
Remove launch now feature, max 10
Jul 28, 2023
7730d74
Fix tests
Jul 28, 2023
5878384
Fix olddeps tests
Jul 29, 2023
2383d1e
Merge loops
Aug 3, 2023
6122291
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
Aug 7, 2023
16be76b
Address comments
Aug 10, 2023
ad25237
Address comments
Aug 10, 2023
7b96379
Add delete_task
Aug 11, 2023
3fd518c
Add index on status
Aug 11, 2023
1a410ee
Add big docstring
Aug 14, 2023
8ed3e37
Add running tasks metric and log running tasks for more than 24hrs wi…
Aug 14, 2023
60ea494
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
Aug 14, 2023
c089f67
Remove unused sql file
Aug 16, 2023
d158b67
Address reviews
Aug 16, 2023
223f87e
Address comment
Aug 16, 2023
e5a5344
Fix
Aug 16, 2023
56ab35e
Address comment
Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15891.feature
@@ -0,0 +1 @@
Implements a task scheduler for resumable potentially long running tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a feature -- it is something internally used by Synapse; admins and end-users don't care.

We could probably just combine this with the other PR?

2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Expand Up @@ -91,6 +91,7 @@
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
Expand Down Expand Up @@ -144,6 +145,7 @@ class GenericWorkerStore(
TransactionWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
Expand Down
6 changes: 6 additions & 0 deletions synapse/server.py
Expand Up @@ -141,6 +141,7 @@
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -359,6 +360,7 @@ def setup_background_tasks(self) -> None:
"""
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
self.get_task_scheduler()
clokep marked this conversation as resolved.
Show resolved Hide resolved

def get_reactor(self) -> ISynapseReactor:
"""
Expand Down Expand Up @@ -912,3 +914,7 @@ def get_request_ratelimiter(self) -> RequestRatelimiter:
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)

@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Expand Up @@ -70,6 +70,7 @@
from .stats import StatsStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .task_scheduler import TaskSchedulerWorkerStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
Expand Down Expand Up @@ -127,6 +128,7 @@ class DataStore(
CacheInvalidationWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
def __init__(
self,
Expand Down
173 changes: 173 additions & 0 deletions synapse/storage/databases/main/task_scheduler.py
@@ -0,0 +1,173 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus

if TYPE_CHECKING:
from synapse.server import HomeServer


class TaskSchedulerWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

@staticmethod
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
row["status"] = TaskStatus(row["status"])
if row["params"] is not None:
row["params"] = json.loads(row["params"])
if row["result"] is not None:
row["result"] = json.loads(row["result"])
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
return ScheduledTask(**row)

async def get_scheduled_tasks(
self, action: Optional[str] = None, resource_id: Optional[str] = None
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.

If the parameters are `None` all the tasks are returned.

Args:
action: Limit the returned tasks to this specific action name
resource_id: Limit the returned tasks to this specific resource id

Returns: a list of `ScheduledTask`
"""
keyvalues = {}
if action:
keyvalues["action"] = action
if resource_id:
keyvalues["resource_id"] = resource_id

rows = await self.db_pool.simple_select_list(
table="scheduled_tasks",
keyvalues=keyvalues,
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
desc="get_scheduled_tasks",
)

return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]

async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
"""Upsert a specified `ScheduledTask` in the DB.

Args:
task: the `ScheduledTask` to upsert
"""
await self.db_pool.simple_upsert(
"scheduled_tasks",
{"id": task.id},
{
"action": task.action,
"status": task.status,
"timestamp": task.timestamp,
"resource_id": task.resource_id,
"params": None if task.params is None else json.dumps(task.params),
"result": None if task.result is None else json.dumps(task.result),
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
"error": task.error,
},
desc="upsert_scheduled_task",
)

async def update_scheduled_task(
self,
id: str,
*,
timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> None:
"""Update a scheduled task in the DB with some new value(s).

Args:
id: id of the `ScheduledTask` to update
timestamp: new timestamp of the task
status: new status of the task
result: new result of the task
error: new error of the task
"""
updatevalues: JsonDict = {}
if timestamp is not None:
updatevalues["timestamp"] = timestamp
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
if status is not None:
updatevalues["status"] = status
if result is not None:
updatevalues["result"] = json.dumps(result)
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
if error is not None:
updatevalues["error"] = error
await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)

async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.

Args:
id: the id of the task to retrieve

Returns: the task if available, `None` otherwise
"""
row = await self.db_pool.simple_select_one(
table="scheduled_tasks",
keyvalues={"id": id},
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
allow_none=True,
desc="get_scheduled_task",
)

return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None

async def delete_scheduled_task(self, id: str) -> None:
"""Delete a specific task from its id.

Args:
id: the id of the task to delete
"""
await self.db_pool.simple_delete(
"scheduled_tasks",
keyvalues={"id": id},
desc="delete_scheduled_task",
)
26 changes: 26 additions & 0 deletions synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql
@@ -0,0 +1,26 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- cf ScheduledTask docstring for the meaning of the fields.
CREATE TABLE IF NOT EXISTS scheduled_tasks(
id text PRIMARY KEY,
action text NOT NULL,
status text NOT NULL,
timestamp bigint NOT NULL,
resource_id text,
params text,
result text,
error text
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
);
38 changes: 38 additions & 0 deletions synapse/types/__init__.py
Expand Up @@ -15,6 +15,7 @@
import abc
import re
import string
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -979,3 +980,40 @@ class UserProfile(TypedDict):
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None


class TaskStatus(str, Enum):
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
"""Status of a scheduled task"""

# Task is scheduled but not active
SCHEDULED = "scheduled"
# Task is active and probably running, and if not
# will be run on next scheduler loop run
ACTIVE = "active"
# Task has completed successfully
COMPLETE = "complete"
# Task is over and either returned a failed status, or had an exception
FAILED = "failed"


@attr.s(auto_attribs=True, frozen=True, slots=True)
class ScheduledTask:
"""Description of a scheduled task"""

# id used to identify the task
id: str
# name of the action to be run by this task
action: str
# current status of this task
status: TaskStatus
# if the status is SCHEDULED then this represents when it should be launched,
# otherwise it represents the last time this task got a change of state
timestamp: int
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
# Optionally bind a task to some resource id for easy retrieval
resource_id: Optional[str]
# Optional parameters that will be passed to the function ran by the task
params: Optional[JsonMapping]
# Optional result that can be updated by the running task
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]