Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Generate historic pagination token for /messages when no from token p…
Browse files Browse the repository at this point in the history
…rovided

Part of #12281

Context: #12319 (comment)
  • Loading branch information
MadLittleMods committed Apr 5, 2022
1 parent 9633eb2 commit dd2112e
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 11 deletions.
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ fi

# Run the tests!
echo "Images built; running complement"
go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/
51 changes: 49 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester
from synapse.types import JsonDict, Requester, RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -441,7 +441,54 @@ async def get_messages(
if pagin_config.from_token:
from_token = pagin_config.from_token
else:
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination(
room_id
)
)
assert from_token.room_key.topological
# from_live_token = (
# self.hs.get_event_sources().get_current_token_for_pagination()
# )
# # Convert the live token (sXXX) into a historic token (tXXX-XXX)
# # which is more suitable for /messages.
# current_stream_ordering = from_live_token.room_key.stream
# current_topographical_ordering = (
# await self.store.get_current_topological_token(
# room_id, current_stream_ordering
# )
# )
# from_token = from_live_token.copy_and_replace(
# "room_key",
# RoomStreamToken(
# current_topographical_ordering, current_stream_ordering
# ),
# )
# logger.info(
# "get_messages(room_id=%s)\n\tfrom_token=%s\n\tcurrent_stream_ordering=%s\n\tcurrent_topographical_ordering=%s\n\tfrom_live_token=%s",
# room_id,
# await from_token.to_string(self.store),
# current_stream_ordering,
# current_topographical_ordering,
# await from_live_token.to_string(self.store),
# )
logger.info(
"get_messages(room_id=%s)\n\tfrom_token=%s",
room_id,
await from_token.to_string(self.store),
)
logger.info(
"asdf_get_debug_events_in_room_ordered_by_depth %s",
await self.store.asdf_get_debug_events_in_room_ordered_by_depth(
room_id
),
)
logger.info(
"asdf_get_debug_events_in_room_ordered_by_stream_ordering %s",
await self.store.asdf_get_debug_events_in_room_ordered_by_stream_ordering(
room_id
),
)

if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ async def get_new_events(
def get_current_key(self) -> RoomStreamToken:
return self.store.get_room_max_token()

def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
return self.store.get_room_events_max_id(room_id)


Expand Down
1 change: 1 addition & 0 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"matrix-common~=1.1.0",
# We need packaging.requirements.Requirement, added in 16.1.
"packaging>=16.1",
"tabulate>=0.8.9",
]

CONDITIONAL_REQUIREMENTS = {
Expand Down
47 changes: 43 additions & 4 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from frozendict import frozendict

from twisted.internet import defer
from tabulate import tabulate

from synapse.api.filtering import Filter
from synapse.events import EventBase
Expand Down Expand Up @@ -748,21 +749,21 @@ def _f(txn):
"get_room_event_before_stream_ordering", _f
)

async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> RoomStreamToken:
"""Returns the current token for rooms stream.
By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological
token.
"""
token = self.get_room_max_stream_ordering()
stream_ordering = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
return RoomStreamToken(None, stream_ordering)
else:
topo = await self.db_pool.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
return RoomStreamToken(topo, stream_ordering)

def get_stream_id_for_event_txn(
self,
Expand Down Expand Up @@ -808,6 +809,44 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke
)
return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])

async def asdf_get_debug_events_in_room_ordered_by_depth(self, room_id: str) -> Any:
"""Gets the topological token in a room after or at the given stream
ordering.
Args:
room_id
"""
sql = (
"SELECT depth, stream_ordering, type, state_key, event_id FROM events"
" WHERE events.room_id = ?"
" ORDER BY depth DESC, stream_ordering DESC;"
)
rows = await self.db_pool.execute(
"asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id
)

headers = ["depth", "stream_ordering", "type", "state_key", "event_id"]
return tabulate(rows, headers=headers)

async def asdf_get_debug_events_in_room_ordered_by_stream_ordering(self, room_id: str) -> Any:
"""Gets the topological token in a room after or at the given stream
ordering.
Args:
room_id
"""
sql = (
"SELECT depth, stream_ordering, type, state_key, event_id FROM events"
" WHERE events.room_id = ?"
" ORDER BY stream_ordering DESC, depth DESC;"
)
rows = await self.db_pool.execute(
"asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id
)

headers = ["depth", "stream_ordering", "type", "state_key", "event_id"]
return tabulate(rows, headers=headers)

async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream
ordering.
Expand Down
6 changes: 3 additions & 3 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Iterator, Tuple
from typing import TYPE_CHECKING, Iterator, Optional, Tuple

import attr

Expand Down Expand Up @@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken:
)
return token

def get_current_token_for_pagination(self) -> StreamToken:
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.
Expand All @@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken:
The current token for pagination.
"""
token = StreamToken(
room_key=self.sources.room.get_current_key(),
room_key=await self.sources.room.get_current_key_for_room(room_id),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down

0 comments on commit dd2112e

Please sign in to comment.