Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Rate limit joins per-room (#13276)
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson committed Jul 19, 2022
1 parent 2ee0b6e commit b977867
Show file tree
Hide file tree
Showing 18 changed files with 498 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/13276.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room, and throttle additional joins if that rate grows too large.
4 changes: 4 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ rc_joins:
per_second: 9999
burst_count: 9999

rc_joins_per_room:
per_second: 9999
burst_count: 9999

rc_3pid_validation:
per_second: 1000
burst_count: 1000
Expand Down
10 changes: 10 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ minimum, a `notif_from` setting.)
Specifying an `email` setting under `account_threepid_delegates` will now cause
an error at startup.

## Changes to the event replication streams

Synapse now includes a flag indicating if an event is an outlier when
replicating it to other workers. This is a forwards- and backwards-incompatible
change: v1.63 and workers cannot process events replicated by v1.64 workers, and
vice versa.

Once all workers are upgraded to v1.64 (or downgraded to v1.63), event
replication will resume as normal.

# Upgrading to v1.62.0

## New signatures for spam checker callbacks
Expand Down
19 changes: 19 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,25 @@ rc_joins:
per_second: 0.03
burst_count: 12
```
---
### `rc_joins_per_room`

This option allows admins to ratelimit joins to a room based on the number of recent
joins (local or remote) to that room. It is intended to mitigate mass-join spam
waves which target multiple homeservers.

By default, one join is permitted to a room every second, with an accumulating
buffer of up to ten instantaneous joins.

Example configuration (default values):
```yaml
rc_joins_per_room:
per_second: 1
burst_count: 10
```

_Added in Synapse 1.64.0._

---
### `rc_3pid_validation`

Expand Down
7 changes: 7 additions & 0 deletions synapse/config/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
defaults={"per_second": 0.01, "burst_count": 10},
)

# Track the rate of joins to a given room. If there are too many, temporarily
# prevent local joins and remote joins via this server.
self.rc_joins_per_room = RateLimitConfig(
config.get("rc_joins_per_room", {}),
defaults={"per_second": 1, "burst_count": 10},
)

# Ratelimit cross-user key requests:
# * For local requests this is keyed by the sending device.
# * For requests received over federation this is keyed by the origin.
Expand Down
16 changes: 16 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(self, hs: "HomeServer"):
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()

self._state_storage_controller = hs.get_storage_controllers().state

Expand Down Expand Up @@ -621,6 +622,15 @@ async def on_make_join_request(
)
raise IncompatibleRoomVersionError(room_version=room_version)

# Refuse the request if that room has seen too many joins recently.
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}

Expand Down Expand Up @@ -655,6 +665,12 @@ async def on_send_join_request(
room_id: str,
caller_supports_partial_state: bool = False,
) -> Dict[str, Any]:
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)

event, context = await self._on_send_membership_event(
origin, content, Membership.JOIN, room_id
)
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,10 @@ async def _notify_persisted_event(
event, event_pos, max_stream_token, extra_users=extra_users
)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
# TODO retrieve the previous state, and exclude join -> join transitions
self._notifier.notify_user_joined_room(event.event_id, event.room_id)

def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event
Expand Down
11 changes: 11 additions & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ def __init__(self, hs: "HomeServer"):
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()

self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state

Expand Down Expand Up @@ -1550,6 +1551,16 @@ async def persist_and_notify_client_event(
requester, is_admin_redaction=is_admin_redaction
)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
_,
) = await self.store.get_local_current_membership_for_user_in_room(
event.state_key, event.room_id
)
if current_membership != Membership.JOIN:
self._notifier.notify_user_joined_room(event.event_id, event.room_id)

await self._maybe_kick_guest_users(event, context)

if event.type == EventTypes.CanonicalAlias:
Expand Down
37 changes: 37 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,29 @@ def __init__(self, hs: "HomeServer"):
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
)
# Tracks joins from local users to rooms this server isn't a member of.
# I.e. joins this server makes by requesting /make_join /send_join from
# another server.
self._join_rate_limiter_remote = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
# TODO: find a better place to keep this Ratelimiter.
# It needs to be
# - written to by event persistence code
# - written to by something which can snoop on replication streams
# - read by the RoomMemberHandler to rate limit joins from local users
# - read by the FederationServer to rate limit make_joins and send_joins from
# other homeservers
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
self._join_rate_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
)

# Ratelimiter for invites, keyed by room (across all issuers, all
# recipients).
Expand Down Expand Up @@ -136,6 +153,18 @@ def __init__(self, hs: "HomeServer"):
)

self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
Use this to inform the RoomMemberHandler about joins that have either
- taken place on another homeserver, or
- on another worker in this homeserver.
Joins actioned by this worker should use the usual `ratelimit` method, which
checks the limit and increments the counter in one go.
"""
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)

@abc.abstractmethod
async def _remote_join(
Expand Down Expand Up @@ -396,6 +425,9 @@ async def _local_membership_update(
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)

result_event = await self.event_creation_handler.handle_new_client_event(
requester,
Expand Down Expand Up @@ -867,6 +899,11 @@ async def update_membership_locked(
await self._join_rate_limiter_remote.ratelimit(
requester,
)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=False,
)

inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
Expand Down
17 changes: 16 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.python.failure import Failure

from synapse.api.constants import EventTypes, ReceiptTypes
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
Expand Down Expand Up @@ -219,6 +219,21 @@ async def on_rdata(
membership=row.data.membership,
)

# If this event is a join, make a note of it so we have an accurate
# cross-worker room rate limit.
# TODO: Erik said we should exclude rows that came from ex_outliers
# here, but I don't see how we can determine that. I guess we could
# add a flag to row.data?
if (
row.data.type == EventTypes.Member
and row.data.membership == Membership.JOIN
and not row.data.outlier
):
# TODO retrieve the previous state, and exclude join -> join transitions
self.notifier.notify_user_joined_room(
row.data.event_id, row.data.room_id
)

await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
)
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
relates_to: Optional[str]
membership: Optional[str]
rejected: bool
outlier: bool


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down
22 changes: 14 additions & 8 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ async def get_room_complexity(self, room_id: str) -> Dict[str, float]:

async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns new events, for the Events replication stream
Args:
Expand All @@ -1506,10 +1506,11 @@ async def get_all_new_forward_event_rows(

def get_all_new_forward_event_rows(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
Expand All @@ -1523,7 +1524,8 @@ def get_all_new_forward_event_rows(
)
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
txn.fetchall(),
)

return await self.db_pool.runInteraction(
Expand All @@ -1532,7 +1534,7 @@ def get_all_new_forward_event_rows(

async def get_ex_outlier_stream_rows(
self, instance_name: str, last_id: int, current_id: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns de-outliered events, for the Events replication stream
Args:
Expand All @@ -1547,11 +1549,14 @@ async def get_ex_outlier_stream_rows(

def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
# NB: the next line (inner join) is what makes this query different from
# get_all_new_forward_event_rows.
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
Expand All @@ -1566,7 +1571,8 @@ def get_ex_outlier_stream_rows_txn(

txn.execute(sql, (last_id, current_id, instance_name))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
txn.fetchall(),
)

return await self.db_pool.runInteraction(
Expand Down
63 changes: 62 additions & 1 deletion tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
tok2 = self.login("fozzie", "bear")
self.helper.join(self._room_id, second_member_user_id, tok=tok2)

def _make_join(self, user_id) -> JsonDict:
def _make_join(self, user_id: str) -> JsonDict:
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}"
Expand Down Expand Up @@ -260,6 +260,67 @@ def test_send_join_partial_state(self):
)
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")

@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
def test_make_join_respects_room_join_rate_limit(self) -> None:
# In the test setup, two users join the room. Since the rate limiter burst
# count is 3, a new make_join request to the room should be accepted.

joining_user = "@ronniecorbett:" + self.OTHER_SERVER_NAME
self._make_join(joining_user)

# Now have a new local user join the room. This saturates the rate limiter
# bucket, so the next make_join should be denied.
new_local_user = self.register_user("animal", "animal")
token = self.login("animal", "animal")
self.helper.join(self._room_id, new_local_user, tok=token)

joining_user = "@ronniebarker:" + self.OTHER_SERVER_NAME
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/v1/make_join/{self._room_id}/{joining_user}"
f"?ver={DEFAULT_ROOM_VERSION}",
)
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)

@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> None:
# Make two make_join requests up front. (These are rate limited, but do not
# contribute to the rate limit.)
join_event_dicts = []
for i in range(2):
joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}"
join_result = self._make_join(joining_user)
join_event_dict = join_result["event"]
self.add_hashes_and_signatures_from_other_server(
join_event_dict,
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
)
join_event_dicts.append(join_event_dict)

# In the test setup, two users join the room. Since the rate limiter burst
# count is 3, the first send_join should be accepted...
channel = self.make_signed_federation_request(
"PUT",
f"/_matrix/federation/v2/send_join/{self._room_id}/join0",
content=join_event_dicts[0],
)
self.assertEqual(channel.code, 200, channel.json_body)

# ... but the second should be denied.
channel = self.make_signed_federation_request(
"PUT",
f"/_matrix/federation/v2/send_join/{self._room_id}/join1",
content=join_event_dicts[1],
)
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)

# NB: we could write a test which checks that the send_join event is seen
# by other workers over replication, and that they update their rate limit
# buckets accordingly. I'm going to assume that the join event gets sent over
# replication, at which point the tests.handlers.room_member test
# test_local_users_joining_on_another_worker_contribute_to_rate_limit
# is probably sufficient to reassure that the bucket is updated.


def _create_acl_event(content):
return make_event_from_dict(
Expand Down
Loading

0 comments on commit b977867

Please sign in to comment.