Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix receipts or account data not being sent down sync (#9193)
Browse files Browse the repository at this point in the history
Introduced in #9104 

This wasn't picked up by the tests as this is all fine the first time you run Synapse (after upgrading), but then when you restart the wrong value is pulled from `stream_positions`.
  • Loading branch information
erikjohnston committed Jan 21, 2021
1 parent 7447f19 commit 2506074
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/9193.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix receipts or account data not being sent down sync. Introduced in v1.26.0rc1.
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events:
if hs.get_instance_name() in hs.config.worker.writers.account_data:
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._receipts_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="account_data",
stream_name="receipts",
instance_name=self._instance_name,
tables=[("receipts_linearized", "instance_name", "stream_id")],
sequence_name="receipts_sequence",
Expand All @@ -61,7 +61,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events:
if hs.get_instance_name() in hs.config.worker.writers.receipts:
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ def __init__(
# We check that the table and sequence haven't diverged.
for table, _, id_column in tables:
self._sequence_gen.check_consistency(
db_conn, table=table, id_column=id_column, positive=positive
db_conn,
table=table,
id_column=id_column,
stream_name=stream_name,
positive=positive,
)

# This goes and fills out the above state from the database.
Expand Down
56 changes: 53 additions & 3 deletions synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@
See docs/postgres.md for more information.
"""

_INCONSISTENT_STREAM_ERROR = """
Postgres sequence '%(seq)s' is inconsistent with associated stream position
of '%(stream_name)s' in the 'stream_positions' table.
This is likely a programming error and should be reported at
https://github.com/matrix-org/synapse.
A temporary workaround to fix this error is to shut down Synapse (including
any and all workers) and run the following SQL:
DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
This will need to be done every time the server is restarted.
"""


class SequenceGenerator(metaclass=abc.ABCMeta):
"""A class which generates a unique sequence of integers"""
Expand All @@ -60,14 +75,20 @@ def check_consistency(
db_conn: "LoggingDatabaseConnection",
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
):
"""Should be called during start up to test that the current value of
the sequence is greater than or equal to the maximum ID in the table.
This is to handle various cases where the sequence value can get out
of sync with the table, e.g. if Synapse gets rolled back to a previous
This is to handle various cases where the sequence value can get out of
sync with the table, e.g. if Synapse gets rolled back to a previous
version and the rolled forwards again.
If a stream name is given then this will check that any value in the
`stream_positions` table is less than or equal to the current sequence
value. If it isn't then it's likely that streams have been crossed
somewhere (e.g. two ID generators have the same stream name).
"""
...

Expand All @@ -93,8 +114,12 @@ def check_consistency(
db_conn: "LoggingDatabaseConnection",
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
):
"""See SequenceGenerator.check_consistency for docstring.
"""

txn = db_conn.cursor(txn_name="sequence.check_consistency")

# First we get the current max ID from the table.
Expand All @@ -118,6 +143,18 @@ def check_consistency(
"SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
)
last_value, is_called = txn.fetchone()

# If we have an associated stream check the stream_positions table.
max_in_stream_positions = None
if stream_name:
txn.execute(
"SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
(stream_name,),
)
row = txn.fetchone()
if row:
max_in_stream_positions = row[0]

txn.close()

# If `is_called` is False then `last_value` is actually the value that
Expand All @@ -138,6 +175,14 @@ def check_consistency(
% {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
)

# If we have values in the stream positions table then they have to be
# less than or equal to `last_value`
if max_in_stream_positions and max_in_stream_positions > last_value:
raise IncorrectDatabaseSetup(
_INCONSISTENT_STREAM_ERROR
% {"seq": self._sequence_name, "stream": stream_name}
)


GetFirstCallbackType = Callable[[Cursor], int]

Expand Down Expand Up @@ -175,7 +220,12 @@ def get_next_id_txn(self, txn: Cursor) -> int:
return self._current_max_id

def check_consistency(
self, db_conn: Connection, table: str, id_column: str, positive: bool = True
self,
db_conn: Connection,
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
):
# There is nothing to do for in memory sequences
pass
Expand Down

0 comments on commit 2506074

Please sign in to comment.