Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Running under sqlite, Synapse incorrectly populates the to-device messages current stream ID #16681

Closed
DMRobertson opened this issue Nov 23, 2023 · 0 comments · Fixed by #16682

Comments

@DMRobertson
Copy link
Contributor

Spotted by @kegsay, diagnosed by me.

Synapse writes to the device inbox and outbox tables here:

async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
)
for user_id in local_messages_by_user_then_device.keys():
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
for destination in remote_messages_by_destination.keys():
self._device_federation_outbox_stream_cache.entity_has_changed(
destination, stream_id
)

def add_messages_txn(
txn: LoggingTransaction, now_ms: int, stream_id: int
) -> None:
# Add the local messages directly to the local inbox.
self._add_messages_to_local_device_inbox_txn(
txn, stream_id, local_messages_by_user_then_device
)
# Add the remote messages to the federation outbox.
# We'll send them to a remote server when we next send a
# federation transaction to that destination.
self.db_pool.simple_insert_many_txn(
txn,
table="device_federation_outbox",
keys=(
"destination",
"stream_id",
"queued_ts",
"messages_json",
"instance_name",
),
values=[
(
destination,
stream_id,
now_ms,
json_encoder.encode(edu),
self._instance_name,
)
for destination, edu in remote_messages_by_destination.items()
],
)
for destination, edu in remote_messages_by_destination.items():
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Queued outgoing to-device messages with "
"stream_id %i, EDU message_id %s, type %s for %s: %s",
stream_id,
edu["message_id"],
edu["type"],
destination,
[
f"{user_id}/{device_id} (msgid "
f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
for (user_id, messages_by_device) in edu["messages"].items()
for (device_id, msg) in messages_by_device.items()
],
)
for user_id, messages_by_device in edu["messages"].items():
for device_id, msg in messages_by_device.items():
with start_active_span("store_outgoing_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg.get(EventContentFields.TO_DEVICE_MSGID),
)

Suppose that:

  1. Synapse calls add_messages_txn with 0 local events (for the inbox) and at least 1 remote event (for the outbox). This is stored normally in the DB.
  2. The event fails to send, or otherwise does not get its chance to be sent out to the destination.
  3. Synapse shuts down.

At this point,

SELECT MAX(stream_id) FROM device_inbox

is strictly smaller than

SELECT MAX(stream_id) FROM device_federation_outbox

(the latter is the stream ID used in step 1.)

Now Synapse restarts. On sqlite, it populates the to-device stream using

self._device_inbox_id_gen = StreamIdGenerator(
db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id"
)

which will end up calling

def _load_current_id(
db_conn: LoggingDatabaseConnection, table: str, column: str, step: int = 1
) -> int:
cur = db_conn.cursor(txn_name="_load_current_id")
if step == 1:
cur.execute("SELECT MAX(%s) FROM %s" % (column, table))
else:
cur.execute("SELECT MIN(%s) FROM %s" % (column, table))
result = cur.fetchone()
assert result is not None
(val,) = result
cur.close()
current_id = int(val) if val else step
res = (max if step > 0 else min)(current_id, step)
logger.info("Initialising stream generator for %s(%s): %i", table, column, res)
return res

which is the first query, which is smaller.

This means the stream_id has jumped backwards.

I don't think this is necessarily the end of the world, since it looks like the tables don't expect the stream ID to be unique (i.e. they're fine with it being reused). But it is a surprise and makes debugging more confusing.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant