Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix paginating /relations with a live token #14866

Merged
merged 5 commits into from Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14866.bugfix
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.53.0 where `next_batch` tokens from `/sync` could not be used with the `/relations` endpoint.
38 changes: 17 additions & 21 deletions synapse/storage/databases/main/relations.py
Expand Up @@ -40,9 +40,13 @@
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.databases.main.stream import (
generate_next_token,
generate_pagination_bounds,
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken
from synapse.types import JsonDict, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
Expand Down Expand Up @@ -207,24 +211,23 @@ async def get_relations_for_event(
where_clause.append("type = ?")
where_args.append(event_type)

order, from_bound, to_bound = generate_pagination_bounds(
direction,
from_token.room_key if from_token else None,
to_token.room_key if to_token else None,
)

pagination_clause = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
from_token=from_token.room_key.as_historical_tuple()
if from_token
else None,
to_token=to_token.room_key.as_historical_tuple() if to_token else None,
from_token=from_bound,
to_token=to_bound,
engine=self.database_engine,
)

if pagination_clause:
where_clause.append(pagination_clause)

if direction == "b":
order = "DESC"
else:
order = "ASC"

sql = """
SELECT event_id, relation_type, sender, topological_ordering, stream_ordering
FROM event_relations
Expand Down Expand Up @@ -266,16 +269,9 @@ def _get_recent_references_for_event_txn(
topo_orderings = topo_orderings[:limit]
stream_orderings = stream_orderings[:limit]

topo = topo_orderings[-1]
token = stream_orderings[-1]
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
token -= 1
next_key = RoomStreamToken(topo, token)
next_key = generate_next_token(
direction, topo_orderings[-1], stream_orderings[-1]
)

if from_token:
next_token = from_token.copy_and_replace(
Expand Down
154 changes: 105 additions & 49 deletions synapse/storage/databases/main/stream.py
Expand Up @@ -170,6 +170,104 @@ def generate_pagination_where_clause(
return " AND ".join(where_clause)


def generate_pagination_bounds(
direction: str,
from_token: Optional[RoomStreamToken],
to_token: Optional[RoomStreamToken],
) -> Tuple[
str, Optional[Tuple[Optional[int], int]], Optional[Tuple[Optional[int], int]]
]:
"""
Generate a start and end point for this page of events.

Args:
direction: Whether pagination is going forwards or backwards. One of "f" or "b".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try direction: Literal["f", "b"] to enforce this?

(I expect that would need propagating up the call stack. This may be more trouble than it's worth; ignore if so)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make it an enum even, I'll take a look as a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM either way. But yeah, definitely one for a follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #14927.

from_token: The token to start pagination at, or None to start at the first value.
to_token: The token to end pagination at, or None to not limit the end point.

Returns:
A three tuple of:

ASC or DESC for sorting of the query.

The starting position as a tuple of ints representing
(topological position, stream position) or None if no from_token was
provided. The topological position may be None for live tokens.

The end position in the same format as the starting position, or None
if no to_token was provided.
"""

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
if direction == "b":
order = "DESC"
else:
order = "ASC"

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
# then filtering the results.
from_bound: Optional[Tuple[Optional[int], int]] = None
if from_token:
if from_token.topological is not None:
from_bound = from_token.as_historical_tuple()
elif direction == "b":
from_bound = (
None,
from_token.get_max_stream_pos(),
)
else:
from_bound = (
None,
from_token.stream,
)

to_bound: Optional[Tuple[Optional[int], int]] = None
if to_token:
if to_token.topological is not None:
to_bound = to_token.as_historical_tuple()
elif direction == "b":
to_bound = (
None,
to_token.stream,
)
else:
to_bound = (
None,
to_token.get_max_stream_pos(),
)

return order, from_bound, to_bound


def generate_next_token(
direction: str, last_topo_ordering: int, last_stream_ordering: int
) -> RoomStreamToken:
"""
Generate the next room stream token based on the currently returned data.

Args:
direction: Whether pagination is going forwards or backwards. One of "f" or "b".
last_topo_ordering: The last topological ordering being returned.
last_stream_ordering: The last stream ordering being returned.

Returns:
A new RoomStreamToken to return to the client.
"""
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
last_stream_ordering -= 1
return RoomStreamToken(last_topo_ordering, last_stream_ordering)


def _make_generic_sql_bound(
bound: str,
column_names: Tuple[str, str],
Expand Down Expand Up @@ -1272,47 +1370,11 @@ def _paginate_room_events_txn(
`to_token`), or `limit` is zero.
"""

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == "b":
order = "DESC"
else:
order = "ASC"

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
# then filtering the results.
if from_token.topological is not None:
from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
elif direction == "b":
from_bound = (
None,
from_token.get_max_stream_pos(),
)
else:
from_bound = (
None,
from_token.stream,
)

to_bound: Optional[Tuple[Optional[int], int]] = None
if to_token:
if to_token.topological is not None:
to_bound = to_token.as_historical_tuple()
elif direction == "b":
to_bound = (
None,
to_token.stream,
)
else:
to_bound = (
None,
to_token.get_max_stream_pos(),
)
order, from_bound, to_bound = generate_pagination_bounds(
direction, from_token, to_token
)

bounds = generate_pagination_where_clause(
direction=direction,
Expand Down Expand Up @@ -1408,16 +1470,10 @@ def _paginate_room_events_txn(
][:limit]

if rows:
topo = rows[-1].topological_ordering
token = rows[-1].stream_ordering
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
token -= 1
next_token = RoomStreamToken(topo, token)
assert rows[-1].topological_ordering is not None
next_token = generate_next_token(
direction, rows[-1].topological_ordering, rows[-1].stream_ordering
)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
Expand Down