Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a linearizer on (appservice, stream) when handling ephemeral events. #11207

Merged
1 change: 1 addition & 0 deletions changelog.d/11207.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper.
69 changes: 49 additions & 20 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand All @@ -58,6 +59,10 @@ def __init__(self, hs: "HomeServer"):
self.current_max = 0
self.is_processing = False

self._ephemeral_events_linearizer = Linearizer(
name="appservice_ephemeral_events"
)

def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.

Expand Down Expand Up @@ -248,26 +253,37 @@ async def _notify_interested_services_ephemeral(
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
continue

elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)

elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
# Since we read/update the stream position for this AS/stream
with (
await self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)

elif stream_key == "presence_key":
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
events = await self._handle_presence(service, users, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

async def _handle_typing(
self, service: ApplicationService, new_token: int
Expand Down Expand Up @@ -304,7 +320,9 @@ async def _handle_typing(
)
return typing

async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.

Expand All @@ -323,14 +341,21 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
logger.debug("Rejecting token lower than stored: %s" % (new_token,))
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
return []

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts

async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
new_token: Optional[int],
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
Expand All @@ -353,6 +378,10 @@ async def _handle_presence(
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
if new_token is not None and new_token <= from_key:
logger.debug("Rejecting token lower than stored: %s" % (new_token,))
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
return []

for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
Expand Down
52 changes: 52 additions & 0 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def setUp(self):
hs.get_application_service_scheduler.return_value = self.mock_scheduler
hs.get_clock.return_value = MockClock()
self.handler = ApplicationServicesHandler(hs)
self.event_source = hs.get_event_sources()

def test_notify_interested_services(self):
interested_service = self._mkservice(is_interested=True)
Expand Down Expand Up @@ -252,6 +253,57 @@ async def get_3pe_protocol(service, unusedProtocol):
},
)

def test_notify_interested_services_ephemeral(self):
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
"""
Test sending ephemeral events to the appservice handler are scheduled
to be pushed out to interested appservices, and that the stream ID is
updated accordingly.
"""
interested_service = self._mkservice(is_interested=True)
services = [interested_service]

self.mock_store.get_app_services.return_value = services
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
579
)

event = Mock(event_id="event_1")
self.event_source.sources.receipt.get_new_events_as.return_value = (
make_awaitable(([event], None))
)

self.handler.notify_interested_services_ephemeral("receipt_key", 580)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
interested_service, [event]
)
self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
interested_service,
"read_receipt",
580,
)

def test_notify_interested_services_ephemeral_out_of_order(self):
"""
Test sending out of order ephemeral events to the appservice handler
are ignored.
"""
interested_service = self._mkservice(is_interested=True)
services = [interested_service]

self.mock_store.get_app_services.return_value = services
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
580
)

event = Mock(event_id="event_1")
self.event_source.sources.receipt.get_new_events_as.return_value = (
make_awaitable(([event], None))
)

self.handler.notify_interested_services_ephemeral("receipt_key", 579)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
self.mock_store.set_type_stream_id_for_appservice.assert_not_called()

def _mkservice(self, is_interested, protocols=None):
service = Mock()
service.is_interested.return_value = make_awaitable(is_interested)
Expand Down