Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Migrate stream_ordering to a bigint #10264

Merged
merged 3 commits into from Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10264.bugfix
@@ -0,0 +1 @@
Fix a long-standing bug where Synapse would return errors after 2<sup>31</sup> events were handled by the server.
136 changes: 121 additions & 15 deletions synapse/storage/databases/main/events_bg_updates.py
Expand Up @@ -29,6 +29,25 @@
logger = logging.getLogger(__name__)


_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
# there should be no leftover rows without a stream_ordering2, but just in case...
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
# finally, we can drop the rule and switch the columns
"DROP RULE populate_stream_ordering2 ON events",
"ALTER TABLE events DROP COLUMN stream_ordering",
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
)


class _BackgroundUpdates:
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"


@attr.s(slots=True, frozen=True)
class _CalculateChainCover:
"""Return value for _calculate_chain_cover_txn."""
Expand All @@ -48,19 +67,15 @@ class _CalculateChainCover:


class EventsBackgroundUpdatesStore(SQLBaseStore):

EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self.db_pool.updates.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
_BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
self._background_reindex_origin_server_ts,
)
self.db_pool.updates.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
_BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender,
)

Expand All @@ -85,7 +100,8 @@ def __init__(self, database: DatabasePool, db_conn, hs):
)

self.db_pool.updates.register_background_update_handler(
self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
_BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES,
self._cleanup_extremities_bg_update,
)

self.db_pool.updates.register_background_update_handler(
Expand Down Expand Up @@ -139,6 +155,24 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._purged_chain_cover_index,
)

# bg updates for replacing stream_ordering with a BIGINT
# (these only run on postgres.)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
self._background_populate_stream_ordering2,
)
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2,
index_name="events_stream_ordering",
table="events",
columns=["stream_ordering2"],
unique=True,
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
self._background_replace_stream_ordering_column,
)

async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
Expand Down Expand Up @@ -190,18 +224,18 @@ def reindex_txn(txn):
}

self.db_pool.updates._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
txn, _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)

return len(rows)

result = await self.db_pool.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
_BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)

if not result:
await self.db_pool.updates._end_background_update(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
_BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
)

return result
Expand Down Expand Up @@ -264,18 +298,18 @@ def reindex_search_txn(txn):
}

self.db_pool.updates._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
txn, _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, progress
)

return len(rows_to_update)

result = await self.db_pool.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
_BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)

if not result:
await self.db_pool.updates._end_background_update(
self.EVENT_ORIGIN_SERVER_TS_NAME
_BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME
)

return result
Expand Down Expand Up @@ -454,7 +488,7 @@ def _cleanup_extremities_bg_update_txn(txn):

if not num_handled:
await self.db_pool.updates._end_background_update(
self.DELETE_SOFT_FAILED_EXTREMITIES
_BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES
)

def _drop_table_txn(txn):
Expand Down Expand Up @@ -1009,3 +1043,75 @@ def purged_chain_cover_txn(txn) -> int:
await self.db_pool.updates._end_background_update("purged_chain_cover")

return result

async def _background_populate_stream_ordering2(
self, progress: JsonDict, batch_size: int
) -> int:
"""Populate events.stream_ordering2, then replace stream_ordering

This is to deal with the fact that stream_ordering was initially created as a
32-bit integer field.
"""
batch_size = max(batch_size, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an issue here? I'd expect setting batch_size to zero/negative anywhere is going to cause issues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. It's just defensive here.


def process(txn: Cursor) -> int:
# if this is the first pass, find the minimum stream ordering
last_stream = progress.get("last_stream")
if last_stream is None:
txn.execute(
"""
SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
"""
)
rows = txn.fetchall()
if not rows:
return 0
last_stream = rows[0][0] - 1

txn.execute(
"""
UPDATE events SET stream_ordering2=stream_ordering
WHERE stream_ordering > ? AND stream_ordering <= ?
""",
(last_stream, last_stream + batch_size),
)
row_count = txn.rowcount

self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
{"last_stream": last_stream + batch_size},
)
return row_count

result = await self.db_pool.runInteraction(
"_background_populate_stream_ordering2", process
)

if result != 0:
return result

await self.db_pool.updates._end_background_update(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2
)
return 0

async def _background_replace_stream_ordering_column(
self, progress: JsonDict, batch_size: int
) -> int:
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""

def process(txn: Cursor) -> None:
for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
logger.info("completing stream_ordering migration: %s", sql)
txn.execute(sql)

await self.db_pool.runInteraction(
"_background_replace_stream_ordering_column", process
)

await self.db_pool.updates._end_background_update(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN
)

return 0
2 changes: 1 addition & 1 deletion synapse/storage/schema/__init__.py
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 59
SCHEMA_VERSION = 60
"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down
@@ -0,0 +1,40 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This migration handles the process of changing the type of `stream_ordering` to
-- a BIGINT.
--
-- Note that this is only a problem on postgres as sqlite only has one "integer" type
-- which can cope with values up to 2^63.

-- First add a new column to contain the bigger stream_ordering
ALTER TABLE events ADD COLUMN stream_ordering2 BIGINT;

-- Create a rule which will populate it for new rows.
CREATE OR REPLACE RULE "populate_stream_ordering2" AS
ON INSERT TO events
DO UPDATE events SET stream_ordering2=NEW.stream_ordering WHERE stream_ordering=NEW.stream_ordering;

-- Start a bg process to populate it for old events
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6001, 'populate_stream_ordering2', '{}');

-- ... and another to build an index on it
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');

-- ... and another to do the switcheroo
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');