Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make token serializing/deserializing async #8427

Merged
merged 5 commits into from Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8427.misc
@@ -0,0 +1 @@
Make stream token serializing/deserializing async.
4 changes: 2 additions & 2 deletions synapse/handlers/events.py
Expand Up @@ -133,8 +133,8 @@ async def get_stream(

chunk = {
"chunk": chunks,
"start": tokens[0].to_string(),
"end": tokens[1].to_string(),
"start": await tokens[0].to_string(self.store),
"end": await tokens[1].to_string(self.store),
}

return chunk
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/initial_sync.py
Expand Up @@ -203,8 +203,8 @@ async def handle_room(event: RoomsForUser):
messages, time_now=time_now, as_client_event=as_client_event
)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
}

d["state"] = await self._event_serializer.serialize_events(
Expand Down Expand Up @@ -249,7 +249,7 @@ async def handle_room(event: RoomsForUser):
],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
"end": await now_token.to_string(self.store),
}

return ret
Expand Down Expand Up @@ -348,8 +348,8 @@ async def _room_initial_sync_parted(
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
await self._event_serializer.serialize_events(
Expand Down Expand Up @@ -447,8 +447,8 @@ async def get_receipts():
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": state,
"presence": presence,
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/pagination.py
Expand Up @@ -413,8 +413,8 @@ async def get_messages(
if not events:
return {
"chunk": [],
"start": from_token.to_string(),
"end": next_token.to_string(),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

state = None
Expand Down Expand Up @@ -442,8 +442,8 @@ async def get_messages(
events, time_now, as_client_event=as_client_event
)
),
"start": from_token.to_string(),
"end": next_token.to_string(),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

if state:
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/room.py
Expand Up @@ -1077,11 +1077,13 @@ def filter_evts(events):
# the token, which we replace.
token = StreamToken.START

results["start"] = token.copy_and_replace(
results["start"] = await token.copy_and_replace(
"room_key", results["start"]
).to_string()
).to_string(self.store)

results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
results["end"] = await token.copy_and_replace(
"room_key", results["end"]
).to_string(self.store)

return results

Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/search.py
Expand Up @@ -362,13 +362,13 @@ async def search(self, user, content, batch=None):
self.storage, user.to_string(), res["events_after"]
)

res["start"] = now_token.copy_and_replace(
res["start"] = await now_token.copy_and_replace(
"room_key", res["start"]
).to_string()
).to_string(self.store)

res["end"] = now_token.copy_and_replace(
res["end"] = await now_token.copy_and_replace(
"room_key", res["end"]
).to_string()
).to_string(self.store)

if include_profile:
senders = {
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/admin/__init__.py
Expand Up @@ -110,7 +110,7 @@ async def on_POST(self, request, room_id, event_id):
raise SynapseError(400, "Event is for wrong room.")

room_token = await self.store.get_topological_token_for_event(event_id)
token = str(room_token)
token = await room_token.to_string(self.store)

logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
elif "purge_up_to_ts" in body:
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/events.py
Expand Up @@ -33,6 +33,7 @@ def __init__(self, hs):
super().__init__()
self.event_stream_handler = hs.get_event_stream_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -44,7 +45,7 @@ async def on_GET(self, request):
if b"room_id" in request.args:
room_id = request.args[b"room_id"][0].decode("ascii")

pagin_config = PaginationConfig.from_request(request)
pagin_config = await PaginationConfig.from_request(self.store, request)
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
if b"timeout" in request.args:
try:
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/initial_sync.py
Expand Up @@ -27,11 +27,12 @@ def __init__(self, hs):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request)
as_client_event = b"raw" not in request.args
pagination_config = PaginationConfig.from_request(request)
pagination_config = await PaginationConfig.from_request(self.store, request)
include_archived = parse_boolean(request, "archived", default=False)
content = await self.initial_sync_handler.snapshot_all_rooms(
user_id=requester.user.to_string(),
Expand Down
11 changes: 8 additions & 3 deletions synapse/rest/client/v1/room.py
Expand Up @@ -451,6 +451,7 @@ def __init__(self, hs):
super().__init__()
self.message_handler = hs.get_message_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
Expand All @@ -465,7 +466,7 @@ async def on_GET(self, request, room_id):
if at_token_string is None:
at_token = None
else:
at_token = StreamToken.from_string(at_token_string)
at_token = await StreamToken.from_string(self.store, at_token_string)

# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
Expand Down Expand Up @@ -521,10 +522,13 @@ def __init__(self, hs):
super().__init__()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request, default_limit=10)
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
)
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, b"filter", encoding="utf-8")
if filter_str:
Expand Down Expand Up @@ -580,10 +584,11 @@ def __init__(self, hs):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
pagination_config = await PaginationConfig.from_request(self.store, request)
content = await self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v2_alpha/keys.py
Expand Up @@ -180,6 +180,7 @@ def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -191,7 +192,7 @@ async def on_GET(self, request):
# changes after the "to" as well as before.
set_tag("to", parse_string(request, "to"))

from_token = StreamToken.from_string(from_token_string)
from_token = await StreamToken.from_string(self.store, from_token_string)

user_id = requester.user.to_string()

Expand Down
7 changes: 4 additions & 3 deletions synapse/rest/client/v2_alpha/sync.py
Expand Up @@ -77,6 +77,7 @@ def __init__(self, hs):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.sync_handler = hs.get_sync_handler()
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
Expand Down Expand Up @@ -152,7 +153,7 @@ async def on_GET(self, request):
)

if since is not None:
since_token = StreamToken.from_string(since)
since_token = await StreamToken.from_string(self.store, since)
else:
since_token = None

Expand Down Expand Up @@ -236,7 +237,7 @@ async def encode_response(self, time_now, sync_result, access_token_id, filter):
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
"next_batch": await sync_result.next_batch.to_string(self.store),
}

@staticmethod
Expand Down Expand Up @@ -413,7 +414,7 @@ def serialize(events):
result = {
"timeline": {
"events": serialized_timeline,
"prev_batch": room.timeline.prev_batch.to_string(),
"prev_batch": await room.timeline.prev_batch.to_string(self.store),
"limited": room.timeline.limited,
},
"state": {"events": serialized_state},
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/purge_events.py
Expand Up @@ -42,17 +42,17 @@ async def purge_history(
The set of state groups that are referenced by deleted events.
"""

parsed_token = await RoomStreamToken.parse(self, token)

return await self.db_pool.runInteraction(
"purge_history",
self._purge_history_txn,
room_id,
token,
parsed_token,
delete_local_events,
)

def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
token = RoomStreamToken.parse(token_str)

def _purge_history_txn(self, txn, room_id, token, delete_local_events):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down
7 changes: 4 additions & 3 deletions synapse/streams/config.py
Expand Up @@ -39,8 +39,9 @@ class PaginationConfig:
limit = attr.ib(type=Optional[int])

@classmethod
def from_request(
async def from_request(
cls,
store,
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
request: SynapseRequest,
raise_invalid_params: bool = True,
default_limit: Optional[int] = None,
Expand All @@ -54,13 +55,13 @@ def from_request(
if from_tok == "END":
from_tok = None # For backwards compat.
elif from_tok:
from_tok = StreamToken.from_string(from_tok)
from_tok = await StreamToken.from_string(store, from_tok)
except Exception:
raise SynapseError(400, "'from' parameter is invalid")

try:
if to_tok:
to_tok = StreamToken.from_string(to_tok)
to_tok = await StreamToken.from_string(store, to_tok)
except Exception:
raise SynapseError(400, "'to' parameter is invalid")

Expand Down
43 changes: 35 additions & 8 deletions synapse/types.py
Expand Up @@ -18,14 +18,27 @@
import string
import sys
from collections import namedtuple
from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar
from typing import (
TYPE_CHECKING,
Any,
Dict,
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
TypeVar,
)

import attr
from signedjson.key import decode_verify_key_bytes
from unpaddedbase64 import decode_base64

from synapse.api.errors import Codes, SynapseError

if TYPE_CHECKING:
from synapse.storage.databases.main import DataStore

# define a version of typing.Collection that works on python 3.5
if sys.version_info[:3] >= (3, 6, 0):
from typing import Collection
Expand Down Expand Up @@ -393,7 +406,7 @@ class RoomStreamToken:
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))

@classmethod
def parse(cls, string: str) -> "RoomStreamToken":
async def parse(cls, store, string: str) -> "RoomStreamToken":
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
try:
if string[0] == "s":
return cls(topological=None, stream=int(string[1:]))
Expand Down Expand Up @@ -428,7 +441,7 @@ def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken":
def as_tuple(self) -> Tuple[Optional[int], int]:
return (self.topological, self.stream)

def __str__(self) -> str:
async def to_string(self, store: "DataStore") -> str:
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:
Expand All @@ -453,18 +466,32 @@ class StreamToken:
START = None # type: StreamToken

@classmethod
def from_string(cls, string):
async def from_string(cls, store, string):
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
try:
keys = string.split(cls._SEPARATOR)
while len(keys) < len(attr.fields(cls)):
# i.e. old token from before receipt_key
keys.append("0")
return cls(RoomStreamToken.parse(keys[0]), *(int(k) for k in keys[1:]))
return cls(
await RoomStreamToken.parse(store, keys[0]), *(int(k) for k in keys[1:])
)
except Exception:
raise SynapseError(400, "Invalid Token")

def to_string(self):
return self._SEPARATOR.join([str(k) for k in attr.astuple(self, recurse=False)])
async def to_string(self, store: "DataStore") -> str:
return self._SEPARATOR.join(
[
await self.room_key.to_string(store),
str(self.presence_key),
str(self.typing_key),
str(self.receipt_key),
str(self.account_data_key),
str(self.push_rules_key),
str(self.to_device_key),
str(self.device_list_key),
str(self.groups_key),
]
)

@property
def room_stream_id(self):
Expand Down Expand Up @@ -493,7 +520,7 @@ def copy_and_replace(self, key, new_value) -> "StreamToken":
return attr.evolve(self, **{key: new_value})


StreamToken.START = StreamToken.from_string("s0_0")
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)


@attr.s(slots=True, frozen=True)
Expand Down