Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions changelog.d/15480.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Make all ID generators rely on Postgres sequences to improve support for running with multiple homeserver nodes.

1. Update the implementation of `IdGenerator` to use defer to `SequenceGenerator`.
* The `SequenceGenerator` object is obtained from `build_sequence_generator`, which returns a
`PostgresSequenceGenerator` backed by a Postgres sequence when running against Postgres.
* `get_next` now involves querying the next value from the DB in an async fashion, hence we add a synchronous
`get_next_txn` which can be used when a `Cursor` object is already available.
2. Replace all instances of `StreamIdGenerator` with `MultiWriterIdGenerator`, which is backed by Postgres sequences,
when running with Postgres. `SequenceIdGenerator` is kept when running with SQLite, which doesn't support sequences.
3. Bump schema version and add delta files to create the Postgres sequences, and add `instance_name` columns to tables
used with `MultiWriterIdGenerator`.
* Note that in case of downgrade, the sequence will get out of sync with the database and startup consistency checks
will fail when upgrading again.
54 changes: 40 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@
)
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
Expand Down Expand Up @@ -90,20 +92,44 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._device_list_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
("device_lists_remote_pending", "stream_id"),
("device_lists_changes_converted_stream_position", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
)
self._device_list_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
Copy link
Author

@alexcornet alexcornet Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the pattern I noticed in other files where MultiWriterIdGenerator is used, although I think we could refactor this and create a helper function to get the right AbstractStreamIdGenerator depending on the engine, similarly to what build_sequence_generator is doing.
It would be less verbose and abstract some complexity away from the callee by making MultiWriterIdGenerator and StreamIdGenerator implementation details.
If you agree with this I could start by making the refactor on develop and then use the new pattern here.

Note that I think that eventually, StreamIdGenerator and MultiWriterIdGenerator should become private to prevent someone from using StreamIdGenerator and breaking the multi node setup.

self._device_list_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="device_lists",
instance_name=hs.get_instance_name(),
tables=[
("device_lists_stream", "instance_name", "stream_id"),
("user_signature_stream", "instance_name", "stream_id"),
("device_lists_outbound_pokes", "instance_name", "stream_id"),
("device_lists_changes_in_room", "instance_name", "stream_id"),
("device_lists_remote_pending", "instance_name", "stream_id"),
(
"device_lists_changes_converted_stream_position",
"instance_name",
"stream_id",
),
],
sequence_name="device_lists_sequence",
writers=["master"],
)
else:
self._device_list_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
("device_lists_remote_pending", "stream_id"),
("device_lists_changes_converted_stream_position", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
)

# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
Expand Down
33 changes: 25 additions & 8 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
Expand Down Expand Up @@ -1210,13 +1214,26 @@ def __init__(
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self._cross_signing_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"e2e_cross_signing_keys",
"stream_id",
)
self._cross_signing_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._cross_signing_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="e2e_cross_signing_keys",
instance_name=hs.get_instance_name(),
tables=[("e2e_cross_signing_keys", "instance_name", "stream_id")],
sequence_name="e2e_cross_signing_keys_sequence",
writers=[], # can be empty as we only need an AbstractStreamIdGenerator
)
else:
self._cross_signing_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"e2e_cross_signing_keys",
"stream_id",
)

async def set_e2e_device_keys(
self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
Expand Down
46 changes: 34 additions & 12 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
Expand Down Expand Up @@ -117,13 +118,26 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"push_rules_stream",
"stream_id",
is_writer=hs.config.worker.worker_app is None,
)
self._push_rules_stream_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
self._push_rules_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="push_rules",
instance_name=hs.get_instance_name(),
tables=[("push_rules_stream", "instance_name", "stream_id")],
sequence_name="push_rules_sequence",
writers=["master"],
)
else:
self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"push_rules_stream",
"stream_id",
is_writer=hs.config.worker.worker_app is None,
)

push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
db_conn,
Expand Down Expand Up @@ -339,8 +353,16 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rule_id_gen = IdGenerator(
db_conn, database, "push_rules", "id", "push_rules_id_sequence"
)
self._push_rules_enable_id_gen = IdGenerator(
db_conn,
database,
"push_rules_enable",
"id",
"push_rules_enable_id_sequence",
)

async def add_push_rule(
self,
Expand Down Expand Up @@ -524,7 +546,7 @@ def _upsert_push_rule_txn(

if txn.rowcount == 0:
# We didn't update a row with the given rule_id so insert one
push_rule_id = self._push_rule_id_gen.get_next()
push_rule_id = self._push_rule_id_gen.get_next_txn(txn)

self.db_pool.simple_insert_txn(
txn,
Expand Down Expand Up @@ -572,7 +594,7 @@ def _upsert_push_rule_txn(
else:
raise RuntimeError("Unknown database engine")

new_enable_id = self._push_rules_enable_id_gen.get_next()
new_enable_id = self._push_rules_enable_id_gen.get_next_txn(txn)
txn.execute(sql, (new_enable_id, user_id, rule_id, 1))

async def delete_push_rule(self, user_id: str, rule_id: str) -> None:
Expand Down Expand Up @@ -659,7 +681,7 @@ def _set_push_rule_enabled_txn(
enabled: bool,
is_default_rule: bool,
) -> None:
new_id = self._push_rules_enable_id_gen.get_next()
new_id = self._push_rules_enable_id_gen.get_next_txn(txn)

if not is_default_rule:
# first check it exists; we need to lock for key share so that a
Expand Down
34 changes: 26 additions & 8 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict
Expand All @@ -59,14 +61,30 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._pushers_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
is_writer=hs.config.worker.worker_app is None,
)
self._pushers_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
self._pushers_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="pushers",
instance_name=hs.get_instance_name(),
tables=[
("pushers", "instance_name", "id"),
("deleted_pushers", "instance_name", "stream_id"),
],
sequence_name="pushers_sequence",
writers=["master"],
)
else:
self._pushers_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
is_writer=hs.config.worker.worker_app is None,
)

self.db_pool.updates.register_background_update_handler(
"remove_deactivated_pushers",
Expand Down
12 changes: 8 additions & 4 deletions synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2148,8 +2148,12 @@ def __init__(
hs.config.server.request_token_inhibit_3pid_errors
)

self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._access_tokens_id_gen = IdGenerator(
db_conn, database, "access_tokens", "id", "access_tokens_id_sequence"
)
self._refresh_tokens_id_gen = IdGenerator(
db_conn, database, "refresh_tokens", "id", "refresh_tokens_id_sequence"
)

# If support for MSC3866 is enabled and configured to require approval for new
# account, we will create new users with an 'approved' flag set to false.
Expand Down Expand Up @@ -2188,7 +2192,7 @@ async def add_access_token_to_user(
Returns:
The token ID
"""
next_id = self._access_tokens_id_gen.get_next()
next_id = await self._access_tokens_id_gen.get_next()
now = self._clock.time_msec()

await self.db_pool.simple_insert(
Expand Down Expand Up @@ -2235,7 +2239,7 @@ async def add_refresh_token_to_user(
Returns:
The token ID
"""
next_id = self._refresh_tokens_id_gen.get_next()
next_id = await self._refresh_tokens_id_gen.get_next()

await self.db_pool.simple_insert(
"refresh_tokens",
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -2106,10 +2106,10 @@ def __init__(
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")

self._instance_name = hs.get_instance_name()
self._event_reports_id_gen = IdGenerator(
db_conn, database, "event_reports", "id", "event_reports_id_sequence"
)

async def upsert_room_on_join(
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
Expand Down Expand Up @@ -2357,7 +2357,7 @@ async def add_event_report(
Returns:
Id of the event report.
"""
next_id = self._event_reports_id_gen.get_next()
next_id = await self._event_reports_id_gen.get_next()
await self.db_pool.simple_insert(
table="event_reports",
values={
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 75 # remember to update the list below when updating
SCHEMA_VERSION = 76 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -97,6 +97,10 @@
`local_current_membership` & `room_memberships`) is now being populated for new
rows. When the background job to populate historical rows lands this will
become the compat schema version.

Changes in SCHEMA_VERSION = 76:
- Create postgres sequences and add instance_name columns to migrate all ID
generators to sequence backed implementations.
"""


Expand Down
12 changes: 12 additions & 0 deletions synapse/storage/schema/main/delta/75/01_add_instance_name.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE device_lists_stream ADD COLUMN instance_name TEXT;

ALTER TABLE user_signature_stream ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_outbound_pokes ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_changes_in_room ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_remote_pending ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_changes_converted_stream_position ADD COLUMN instance_name TEXT;

ALTER TABLE e2e_cross_signing_keys ADD COLUMN instance_name TEXT;

ALTER TABLE pushers ADD COLUMN instance_name TEXT;
ALTER TABLE deleted_pushers ADD COLUMN instance_name TEXT;
Loading