Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reject handling tokens lower than the currently stored position.
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Oct 29, 2021
1 parent 1d9ea27 commit f8f3c4a
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __init__(self, hs: "HomeServer"):
self.current_max = 0
self.is_processing = False

self._ephemeral_events_linearizer = Linearizer(name="appservice_ephemeral_events")
self._ephemeral_events_linearizer = Linearizer(
name="appservice_ephemeral_events"
)

def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
Expand Down Expand Up @@ -260,7 +262,7 @@ async def _notify_interested_services_ephemeral(
)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service)
events = await self._handle_receipts(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
Expand All @@ -272,7 +274,7 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
events = await self._handle_presence(service, users, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
Expand Down Expand Up @@ -318,7 +320,9 @@ async def _handle_typing(
)
return typing

async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
Expand All @@ -337,14 +341,20 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
raise Exception("Rejecting token lower than stored: %s" % (new_token,))

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts

async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
new_token: Optional[int],
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
Expand All @@ -367,6 +377,9 @@ async def _handle_presence(
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
if new_token is not None and new_token <= from_key:
raise Exception("Rejecting token lower than stored: %s" % (new_token,))

for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
Expand Down

0 comments on commit f8f3c4a

Please sign in to comment.