Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace UPDATE with UPSERT on device_max_stream_id table #6363

Merged
merged 5 commits into from Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6363.bugfix
@@ -0,0 +1 @@
Fix `to_device` stream ID getting reset every time Synapse restarts, which had the potential to cause unable to decrypt errors.
17 changes: 15 additions & 2 deletions synapse/storage/data_stores/main/deviceinbox.py
Expand Up @@ -358,8 +358,21 @@ def add_messages_txn(txn, now_ms, stream_id):
def _add_messages_to_local_device_inbox_txn(
self, txn, stream_id, messages_by_user_then_device
):
sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
txn.execute(sql, (stream_id, stream_id))
# Compatible method of performing an upsert
sql = "SELECT stream_id FROM device_max_stream_id"

txn.execute(sql)
rows = txn.fetchone()
if rows:
db_stream_id = rows[0]
if db_stream_id < stream_id:
# Insert the new stream_id
sql = "UPDATE device_max_stream_id SET stream_id = ?"
else:
# No rows, perform an insert
sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also do the update as before and then check the affected row count which would then just be a single query in the normal case, but if this is how we're doing upserts elsewhere then I guess just stick with that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little more complicated since we have to check that the stream_id we're inserting is greater than what's currently in the DB. If it's not, then the UPDATE will return 0 rows affected.

We don't want to then insert another row at that point.


txn.execute(sql, (stream_id,))

local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
Expand Down