Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implementation of MSC2314 #6176

Merged
merged 24 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6176.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314.
37 changes: 36 additions & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(self, hs):

self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
Expand Down Expand Up @@ -299,6 +301,35 @@ def on_context_state_request(self, origin, room_id, event_id):

return 200, resp

@defer.inlineCallbacks
@log_function
def on_context_state_request_v2(self, origin, room_id, event_id):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)

in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")

# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (yield self._server_linearizer.queue((origin, room_id))):
resp = yield self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
)

room_version = yield self.store.get_room_version(room_id)
resp = dict(resp)
resp["room_version"] = room_version

return 200, resp

@defer.inlineCallbacks
def on_state_ids_request(self, origin, room_id, event_id):
if not event_id:
Expand All @@ -318,7 +349,11 @@ def on_state_ids_request(self, origin, room_id, event_id):

@defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id):
pdus = yield self.handler.get_state_for_pdu(room_id, event_id)
if event_id:
pdus = yield self.handler.get_state_for_pdu(room_id, event_id)
else:
pdus = (yield self.state.get_current_state(room_id)).values()

auth_chain = yield self.store.get_auth_chain([pdu.event_id for pdu in pdus])

return {
Expand Down
19 changes: 17 additions & 2 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async def on_GET(self, origin, content, query, event_id):
return await self.handler.on_pdu_request(origin, event_id)


class FederationStateServlet(BaseFederationServlet):
class FederationStateV1Servlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?"

# This is when someone asks for all data for a given context.
Expand All @@ -431,6 +431,20 @@ async def on_GET(self, origin, content, query, context):
)


class FederationStateV2Servlet(BaseFederationServlet):
PATH = "/net.atleastfornow/state/(?P<context>[^/]*)/?"

# See: MSC2314
PREFIX = FEDERATION_UNSTABLE_PREFIX

async def on_GET(self, origin, content, query, context):
return await self.handler.on_context_state_request_v2(
origin,
context,
parse_string_from_args(query, "event_id", None, required=False),
)


class FederationStateIdsServlet(BaseFederationServlet):
PATH = "/state_ids/(?P<room_id>[^/]*)/?"

Expand Down Expand Up @@ -1358,7 +1372,8 @@ async def on_GET(self, origin, content, query, room_id):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
FederationStateServlet,
FederationStateV1Servlet,
FederationStateV2Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,
Expand Down
28 changes: 3 additions & 25 deletions tests/federation/test_complexity.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from twisted.internet import defer

from synapse.api.errors import Codes, SynapseError
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.types import UserID
from synapse.util.ratelimitutils import FederationRateLimiter

from tests import unittest


class RoomComplexityTests(unittest.HomeserverTestCase):
class RoomComplexityTests(unittest.FederatingHomeserverTestCase):

servlets = [
admin.register_servlets,
Expand All @@ -41,25 +38,6 @@ def default_config(self, name="test"):
config["limit_remote_rooms"] = {"enabled": True, "complexity": 0.05}
return config

def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return defer.succeed("otherserver.nottld")

ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)

def test_complexity_simple(self):

u1 = self.register_user("u1", "pass")
Expand Down Expand Up @@ -105,7 +83,7 @@ def test_join_too_large(self):

d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
"roomid",
UserID.from_string(u1),
{"membership": "join"},
Expand Down Expand Up @@ -146,7 +124,7 @@ def test_join_too_large_once_joined(self):

d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
room_1,
UserID.from_string(u1),
{"membership": "join"},
Expand Down
4 changes: 3 additions & 1 deletion tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from synapse.types import ReadReceipt

from tests.unittest import HomeserverTestCase
from tests.unittest import HomeserverTestCase, override_config


class FederationSenderTestCases(HomeserverTestCase):
Expand All @@ -29,6 +29,7 @@ def make_homeserver(self, reactor, clock):
federation_transport_client=Mock(spec=["send_transaction"]),
)

@override_config({"send_federation": True})
def test_send_receipts(self):
mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
Expand Down Expand Up @@ -69,6 +70,7 @@ def test_send_receipts(self):
],
)

@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""
Expand Down
63 changes: 63 additions & 0 deletions tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +17,8 @@

from synapse.events import FrozenEvent
from synapse.federation.federation_server import server_matches_acl_event
from synapse.rest import admin
from synapse.rest.client.v1 import login, room

from tests import unittest

Expand All @@ -41,6 +44,66 @@ def test_block_ip_literals(self):
self.assertTrue(server_matches_acl_event("1:2:3:4", e))


class StateQueryTests(unittest.FederatingHomeserverTestCase):

servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def test_without_event_id(self):
"""
Querying unstable/net.atleastfornow/state/<room_id> without an event ID
will return the current known state.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")

room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.inject_room_member(room_1, "@user:other.example.com", "join")

request, channel = self.make_request(
"GET", "/_matrix/federation/unstable/net.atleastfornow/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)

self.assertEqual(
channel.json_body["room_version"],
self.hs.config.default_room_version.identifier,
)

members = set(
map(
lambda x: x["state_key"],
filter(
lambda x: x["type"] == "m.room.member", channel.json_body["pdus"]
),
)
)

self.assertEqual(members, set(["@user:other.example.com", u1]))
self.assertEqual(len(channel.json_body["pdus"]), 6)

def test_needs_to_be_in_room(self):
"""
Querying unstable/net.atleastfornow/state/<room_id> requires the server
be in the room to provide data.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")

room_1 = self.helper.create_room_as(u1, tok=u1_token)

request, channel = self.make_request(
"GET", "/_matrix/federation/unstable/net.atleastfornow/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(403, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")


def _create_acl_event(content):
return FrozenEvent(
{
Expand Down
3 changes: 3 additions & 0 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.types import UserID

from tests import unittest
from tests.unittest import override_config
from tests.utils import register_federation_servlets

# Some local users to test with
Expand Down Expand Up @@ -171,6 +172,7 @@ def test_started_typing_local(self):
],
)

@override_config({"send_federation": True})
def test_started_typing_remote_send(self):
self.room_members = [U_APPLE, U_ONION]

Expand Down Expand Up @@ -234,6 +236,7 @@ def test_started_typing_remote_recv(self):
],
)

@override_config({"send_federation": True})
def test_stopped_typing(self):
self.room_members = [U_APPLE, U_BANANA, U_ONION]

Expand Down
3 changes: 3 additions & 0 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def prepare(self, reactor, clock, hs):
server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = server_factory.streamer

handler_factory = Mock()
self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler.factory = handler_factory

client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
Expand Down
4 changes: 4 additions & 0 deletions tests/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock

from synapse.replication.tcp.commands import ReplicateCommand
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
Expand All @@ -30,7 +32,9 @@ def prepare(self, reactor, clock, hs):
server = server_factory.buildProtocol(None)

# build a replication client, with a dummy handler
handler_factory = Mock()
self.test_handler = TestReplicationClientHandler()
self.test_handler.factory = handler_factory
self.client = ClientReplicationStreamProtocol(
"client", "test", clock, self.test_handler
)
Expand Down
25 changes: 1 addition & 24 deletions tests/storage/test_roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

from unittest.mock import Mock

from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.api.constants import Membership
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client.v1 import login, room
from synapse.types import Requester, UserID
Expand All @@ -44,8 +43,6 @@ def prepare(self, reactor, clock, hs):
# We can't test the RoomMemberStore on its own without the other event
# storage logic
self.store = hs.get_datastore()
self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler()

self.u_alice = self.register_user("alice", "pass")
self.t_alice = self.login("alice", "pass")
Expand All @@ -54,26 +51,6 @@ def prepare(self, reactor, clock, hs):
# User elsewhere on another host
self.u_charlie = UserID.from_string("@charlie:elsewhere")

def inject_room_member(self, room, user, membership, replaces_state=None):
builder = self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Member,
"sender": user,
"state_key": user,
"room_id": room,
"content": {"membership": membership},
},
)

event, context = self.get_success(
self.event_creation_handler.create_new_client_event(builder)
)

self.get_success(self.store.persist_event(event, context))

return event

def test_one_member(self):

# Alice creates the room, and is automatically joined
Expand Down
Loading