Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix receipts or account data not being sent down sync #9193

Merged
merged 7 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9193.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix receipts or account data not being sent down sync. Introduced in v1.26.0rc1.
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events:
if hs.get_instance_name() in hs.config.worker.writers.account_data:
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._receipts_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="account_data",
stream_name="receipts",
instance_name=self._instance_name,
tables=[("receipts_linearized", "instance_name", "stream_id")],
sequence_name="receipts_sequence",
Expand All @@ -61,7 +61,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events:
if hs.get_instance_name() in hs.config.worker.writers.receipts:
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ def __init__(
# We check that the table and sequence haven't diverged.
for table, _, id_column in tables:
self._sequence_gen.check_consistency(
db_conn, table=table, id_column=id_column, positive=positive
db_conn,
table=table,
id_column=id_column,
stream_name=stream_name,
positive=positive,
)

# This goes and fills out the above state from the database.
Expand Down
56 changes: 53 additions & 3 deletions synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@
See docs/postgres.md for more information.
"""

_INCONSISTENT_STREAM_ERROR = """
Postgres sequence '%(seq)s' is inconsistent with associated stream position
of '%(stream_name)s' in the 'stream_positions' table.

This is likely a programming error and should be reported at
https://github.com/matrix-org/synapse.

A temporary workaround to fix this error is to shut down Synapse (including
any and all workers) and run the following SQL:

DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';

This will need to be done every time the server is restarted.
"""


class SequenceGenerator(metaclass=abc.ABCMeta):
"""A class which generates a unique sequence of integers"""
Expand All @@ -60,14 +75,20 @@ def check_consistency(
db_conn: "LoggingDatabaseConnection",
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
):
"""Should be called during start up to test that the current value of
the sequence is greater than or equal to the maximum ID in the table.

This is to handle various cases where the sequence value can get out
of sync with the table, e.g. if Synapse gets rolled back to a previous
This is to handle various cases where the sequence value can get out of
sync with the table, e.g. if Synapse gets rolled back to a previous
version and the rolled forwards again.

If a stream name is given then will check that any value in the
`stream_positions` table is less than or equal to the current sequence
value. If it isn't then it's likely that streams have been crossed
somewhere (e.g. two ID generators have the same stream name).
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
...

Expand All @@ -93,8 +114,12 @@ def check_consistency(
db_conn: "LoggingDatabaseConnection",
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
):
"""See SequenceGenerator.check_consistency for docstring.
"""

txn = db_conn.cursor(txn_name="sequence.check_consistency")

# First we get the current max ID from the table.
Expand All @@ -118,6 +143,18 @@ def check_consistency(
"SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
)
last_value, is_called = txn.fetchone()

# If we have an associated stream check the stream_positions table.
max_in_stream_positions = None
if stream_name:
txn.execute(
"SELECT MAX(stream_id) FROM stream_positions WHERE stream_name",
(stream_name,),
)
rows = txn.fetchall()
if rows and row[0][0] is not None:
max_in_stream_positions = row[0][0]
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
clokep marked this conversation as resolved.
Show resolved Hide resolved

txn.close()

# If `is_called` is False then `last_value` is actually the value that
Expand All @@ -138,6 +175,14 @@ def check_consistency(
% {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
)

# If we have values in the stream positions table then they have to be
# less than or equal to `last_value`
if max_in_stream_positions and max_in_stream_positions > last_value:
raise IncorrectDatabaseSetup(
_INCONSISTENT_STREAM_ERROR
% {"seq": self._sequence_name, "stream": stream_name}
)


GetFirstCallbackType = Callable[[Cursor], int]

Expand Down Expand Up @@ -175,7 +220,12 @@ def get_next_id_txn(self, txn: Cursor) -> int:
return self._current_max_id

def check_consistency(
self, db_conn: Connection, table: str, id_column: str, positive: bool = True
self,
db_conn: Connection,
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
):
# There is nothing to do for in memory sequences
pass
Expand Down