Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor getting replication updates from database v2. (#7740)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jul 7, 2020
1 parent d378c3d commit 67d7756
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 195 deletions.
1 change: 1 addition & 0 deletions changelog.d/7740.misc
@@ -0,0 +1 @@
Refactor getting replication updates from database.
3 changes: 3 additions & 0 deletions synapse/handlers/typing.py
Expand Up @@ -294,6 +294,9 @@ async def get_all_typing_updates(
rows.sort()

limited = False
# We, unusually, use a strict limit here as we have all the rows in
# memory rather than pulling them out of the database with a `LIMIT ?`
# clause.
if len(rows) > limit:
rows = rows[:limit]
current_id = rows[-1][0]
Expand Down
56 changes: 10 additions & 46 deletions synapse/replication/tcp/streams/_base.py
Expand Up @@ -198,26 +198,6 @@ def current_token_without_instance(
return lambda instance_name: current_token()


def db_query_to_update_function(
query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]]
) -> UpdateFunction:
"""Wraps a db query function which returns a list of rows to make it
suitable for use as an `update_function` for the Stream class
"""

async def update_function(instance_name, from_token, upto_token, limit):
rows = await query_function(from_token, upto_token, limit)
updates = [(row[0], row[1:]) for row in rows]
limited = False
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return update_function


def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
"""Makes a suitable function for use as an `update_function` that queries
the master process for updates.
Expand Down Expand Up @@ -393,7 +373,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_pushers_stream_token),
db_query_to_update_function(store.get_all_updated_pushers_rows),
store.get_all_updated_pushers_rows,
)


Expand Down Expand Up @@ -421,26 +401,12 @@ class CachesStreamRow:
ROW_TYPE = CachesStreamRow

def __init__(self, hs):
self.store = hs.get_datastore()
store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
self.store.get_cache_stream_token,
self._update_function,
)

async def _update_function(
self, instance_name: str, from_token: int, upto_token: int, limit: int
):
rows = await self.store.get_all_updated_caches(
instance_name, from_token, upto_token, limit
store.get_cache_stream_token,
store.get_all_updated_caches,
)
updates = [(row[0], row[1:]) for row in rows]
limited = False
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited


class PublicRoomsStream(Stream):
Expand All @@ -465,7 +431,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_public_room_stream_id),
db_query_to_update_function(store.get_all_new_public_rooms),
store.get_all_new_public_rooms,
)


Expand All @@ -486,7 +452,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_device_stream_token),
db_query_to_update_function(store.get_all_device_list_changes_for_remotes),
store.get_all_device_list_changes_for_remotes,
)


Expand All @@ -504,7 +470,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_to_device_stream_token),
db_query_to_update_function(store.get_all_new_device_messages),
store.get_all_new_device_messages,
)


Expand All @@ -524,7 +490,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_account_data_stream_id),
db_query_to_update_function(store.get_all_updated_tags),
store.get_all_updated_tags,
)


Expand Down Expand Up @@ -612,7 +578,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_group_stream_token),
db_query_to_update_function(store.get_all_groups_changes),
store.get_all_groups_changes,
)


Expand All @@ -630,7 +596,5 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_device_stream_token),
db_query_to_update_function(
store.get_all_user_signature_changes_for_remotes
),
store.get_all_user_signature_changes_for_remotes,
)
36 changes: 30 additions & 6 deletions synapse/storage/data_stores/main/cache.py
Expand Up @@ -16,7 +16,7 @@

import itertools
import logging
from typing import Any, Iterable, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams import BackfillStream, CachesStream
Expand Down Expand Up @@ -46,13 +46,30 @@ def __init__(self, database: Database, db_conn, hs):

async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
):
"""Fetches cache invalidation rows between the two given IDs written
by the given instance. Returns at most `limit` rows.
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for caches replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return []
return [], current_id, False

def get_all_updated_caches_txn(txn):
# We purposefully don't bound by the current token, as we want to
Expand All @@ -66,7 +83,14 @@ def get_all_updated_caches_txn(txn):
LIMIT ?
"""
txn.execute(sql, (last_id, instance_name, limit))
return txn.fetchall()
updates = [(row[0], row[1:]) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db.runInteraction(
"get_all_updated_caches", get_all_updated_caches_txn
Expand Down
54 changes: 38 additions & 16 deletions synapse/storage/data_stores/main/deviceinbox.py
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import List, Tuple

from canonicaljson import json

Expand Down Expand Up @@ -207,47 +208,68 @@ def delete_messages_for_remote_destination_txn(txn):
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)

def get_all_new_device_messages(self, last_pos, current_pos, limit):
"""
async def get_all_new_device_messages(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for to device replication stream.
Args:
last_pos(int):
current_pos(int):
limit(int):
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A deferred list of rows from the device inbox
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""
if last_pos == current_pos:
return defer.succeed([])

if last_id == current_id:
return [], current_id, False

def get_all_new_device_messages_txn(txn):
# We limit like this as we might have multiple rows per stream_id, and
# we want to make sure we always get all entries for any stream_id
# we return.
upper_pos = min(current_pos, last_pos + limit)
upper_pos = min(current_id, last_id + limit)
sql = (
"SELECT max(stream_id), user_id"
" FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
" GROUP BY user_id"
)
txn.execute(sql, (last_pos, upper_pos))
rows = txn.fetchall()
txn.execute(sql, (last_id, upper_pos))
updates = [(row[0], row[1:]) for row in txn]

sql = (
"SELECT max(stream_id), destination"
" FROM device_federation_outbox"
" WHERE ? < stream_id AND stream_id <= ?"
" GROUP BY destination"
)
txn.execute(sql, (last_pos, upper_pos))
rows.extend(txn)
txn.execute(sql, (last_id, upper_pos))
updates.extend((row[0], row[1:]) for row in txn)

# Order by ascending stream ordering
rows.sort()
updates.sort()

return rows
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return self.db.runInteraction(
return updates, upto_token, limited

return await self.db.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
)

Expand Down
70 changes: 48 additions & 22 deletions synapse/storage/data_stores/main/devices.py
Expand Up @@ -582,32 +582,58 @@ def get_users_whose_signatures_changed(self, user_id, from_key):
return set()

async def get_all_device_list_changes_for_remotes(
self, from_key: int, to_key: int, limit: int,
) -> List[Tuple[int, str]]:
"""Return a list of `(stream_id, entity)` which is the combined list of
changes to devices and which destinations need to be poked. Entity is
either a user ID (starting with '@') or a remote destination.
"""
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for device lists replication stream.
# This query Does The Right Thing where it'll correctly apply the
# bounds to the inner queries.
sql = """
SELECT stream_id, entity FROM (
SELECT stream_id, user_id AS entity FROM device_lists_stream
UNION ALL
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
) AS e
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""

return await self.db.execute(
if last_id == current_id:
return [], current_id, False

def _get_all_device_list_changes_for_remotes(txn):
# This query Does The Right Thing where it'll correctly apply the
# bounds to the inner queries.
sql = """
SELECT stream_id, entity FROM (
SELECT stream_id, user_id AS entity FROM device_lists_stream
UNION ALL
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
) AS e
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
"""

txn.execute(sql, (last_id, current_id, limit))
updates = [(row[0], row[1:]) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db.runInteraction(
"get_all_device_list_changes_for_remotes",
None,
sql,
from_key,
to_key,
limit,
_get_all_device_list_changes_for_remotes,
)

@cached(max_entries=10000)
Expand Down

0 comments on commit 67d7756

Please sign in to comment.