Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

User directory background update speedup #15435

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15435.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up the user directory background update.
89 changes: 44 additions & 45 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,44 +102,34 @@ async def _populate_user_directory_createtables(
) -> int:
# Get all the rooms that we want to process.
def _make_staging_area(txn: LoggingTransaction) -> None:
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)

sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)

# Get rooms we want to process from the database
sql = """
SELECT room_id, count(*) FROM current_state_events
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS
SELECT room_id, count(*) AS events
FROM current_state_events
GROUP BY room_id
"""
txn.execute(sql)
rooms = list(txn.fetchall())
self.db_pool.simple_insert_many_txn(
txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
txn.execute(
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)"
Copy link
Contributor

@DMRobertson DMRobertson Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't fully sure why we needed an index at all. I guess the point is:

  • we have tables that contain a big queue of stuff to do
  • we can select a batch of things to do from that table however we like
  • but we need to remove those specific things from that table afterwards? (And that needs a seq scan without an index)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

And for the rooms table we process rooms in order of the events column, descending.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was an oversight here. We did not add these indices to preexisting TEMP_TABLEs. So this change will make future user dir rebuild speedy, but won't help anyone who is doing a rebuild before they upgrade.

Discussion in synapse-dev: https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$oXx8Qg7YkhIqTZkIBwgBEW8qRtk-_AzA4M-85PSR3zU?via=matrix.org&via=element.io&via=beeper.com

)
del rooms

sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_users(user_id TEXT NOT NULL)"
txn.execute(
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)"
)
txn.execute(sql)

txn.execute("SELECT name FROM users")
users = list(txn.fetchall())
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position (
position TEXT NOT NULL
)
"""
txn.execute(sql)

self.db_pool.simple_insert_many_txn(
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS
SELECT name AS user_id FROM users
"""
txn.execute(sql)
txn.execute(
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)"
)

new_pos = await self.get_max_stream_id_in_current_state_deltas()
Expand Down Expand Up @@ -222,12 +212,13 @@ def _get_next_batch(
if not rooms_to_work_on:
return None

# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
result = txn.fetchone()
assert result is not None
progress["remaining"] = result[0]
if "remaining" not in progress:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this is trying to do. Does this mean we write a progress count at the end of the first iteration and never update the counter?

Copy link
Member Author

@erikjohnston erikjohnston Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry. We fetch the count once, and then afterwards we decrement the count every time we delete a row from the table.

progress["remaining"] -= 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, perfect.

# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
result = txn.fetchone()
assert result is not None
progress["remaining"] = result[0]

return rooms_to_work_on

Expand Down Expand Up @@ -332,7 +323,14 @@ def _get_next_batch(

if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
break

await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_rooms",
progress,
)

return processed_event_count

Expand All @@ -356,13 +354,14 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:

users_to_work_on = [x[0] for x in user_result]

# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
count_result = txn.fetchone()
assert count_result is not None
progress["remaining"] = count_result[0]
if "remaining" not in progress:
# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
count_result = txn.fetchone()
assert count_result is not None
progress["remaining"] = count_result[0]

return users_to_work_on

Expand Down