Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Abstract tricky off-by-one logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Jan 18, 2023
1 parent 73dcc6d commit 87aa0fa
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
16 changes: 5 additions & 11 deletions synapse/storage/databases/main/relations.py
Expand Up @@ -41,11 +41,12 @@
make_in_list_sql_clause,
)
from synapse.storage.databases.main.stream import (
generate_next_token,
generate_pagination_bounds,
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken
from synapse.types import JsonDict, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
Expand Down Expand Up @@ -268,16 +269,9 @@ def _get_recent_references_for_event_txn(
topo_orderings = topo_orderings[:limit]
stream_orderings = stream_orderings[:limit]

topo = topo_orderings[-1]
token = stream_orderings[-1]
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
token -= 1
next_key = RoomStreamToken(topo, token)
next_key = generate_next_token(
direction, topo_orderings[-1], stream_orderings[-1]
)

if from_token:
next_token = from_token.copy_and_replace(
Expand Down
38 changes: 28 additions & 10 deletions synapse/storage/databases/main/stream.py
Expand Up @@ -244,6 +244,30 @@ def generate_pagination_bounds(
return order, from_bound, to_bound


def generate_next_token(
direction: str, last_topo_ordering: int, last_stream_ordering: int
) -> RoomStreamToken:
"""
Generate the next room stream token based on the currently returned data.
Args:
direction: Whether pagination is going forwards or backwards. One of "f" or "b".
last_topo_ordering: The last topological ordering being returned.
last_stream_ordering: The last stream ordering being returned.
Returns:
A new RoomStreamToken to return to the client.
"""
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
last_stream_ordering -= 1
return RoomStreamToken(last_topo_ordering, last_stream_ordering)


def _make_generic_sql_bound(
bound: str,
column_names: Tuple[str, str],
Expand Down Expand Up @@ -1446,16 +1470,10 @@ def _paginate_room_events_txn(
][:limit]

if rows:
topo = rows[-1].topological_ordering
token = rows[-1].stream_ordering
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
token -= 1
next_token = RoomStreamToken(topo, token)
assert rows[-1].topological_ordering is not None
next_token = generate_next_token(
direction, rows[-1].topological_ordering, rows[-1].stream_ordering
)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
Expand Down

0 comments on commit 87aa0fa

Please sign in to comment.