Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Sync current token may be ahead of event stream cache position using workers #14158

Open
Fizzadar opened this issue Oct 12, 2022 · 21 comments
Open
Labels
A-Sync defects related to /sync A-Workers Problems related to running Synapse in Worker Mode (or replication) O-Uncommon Most users are unlikely to come across this or unexpected workflow S-Minor Blocks non-critical functionality, workarounds exist. T-Defect Bugs, crashes, hangs, security vulnerabilities, or other reported issues.

Comments

@Fizzadar
Copy link
Contributor

Based on my initial work investigating sync cache races in: #14154

Sync makes use of the event stream cache to determine whether a room has changed between the since & current tokens. This is then used to limit the number of rooms events are queried for in get_room_events_stream_for_rooms. After discovering the cache invalidation races above I added a quick log line for this: beeper/synapse@62497db (after beeper/synapse@5297155).

And it logs! Only a very small handful of occurrences over the last ~5 days and the position difference has so far been 1 every time. I suspect this may also occur against other stream caches but have not confirmed.

The worry here is if an event was sent within the gap it may be missed from an incremental sync which is especially bad because the user will never see or know about this event unless they re-init sync (or the client backfills it?).

One solution to this is to implement a waiting mechanism on StreamCache so a worker can wait for the cache to catch up with the current token for a given sync before fetching data. Because this is super rare and even when it happens it's a tiny position difference this would probably have negligable impact in sync performance and provide a shield against cache invalidation races over replication.

@DMRobertson DMRobertson added A-Sync defects related to /sync S-Minor Blocks non-critical functionality, workarounds exist. T-Defect Bugs, crashes, hangs, security vulnerabilities, or other reported issues. O-Uncommon Most users are unlikely to come across this or unexpected workflow A-Workers Problems related to running Synapse in Worker Mode (or replication) labels Oct 14, 2022
@Fizzadar
Copy link
Contributor Author

Fizzadar commented Oct 21, 2022

Confirmed via the log change above, this is actually not that rare (😱). We have large bursts of these in sync workers:

Checking entity in DeviceInboxStreamChangeCache stream cache cache with
    current position 42696041 ahead of  the max stream cache position 42696040

There's even more in federation sender instances DeviceFederationOutboxStreamChangeCache, pretty much a constant flow of these:

Checking entity in DeviceFederationOutboxStreamChangeCache stream cache cache with
    current position 42696140 ahead of the max stream cache position 42696092

Finally the background work has a constant (1/s) flow of such errors on the _curr_state_delta_stream_cache cache, where in that case the cache often appears to be lagging significantly:

Checking entity in _curr_state_delta_stream_cache stream cache cache with
    current position 540123283 ahead of the max stream cache position 540123279

@Fizzadar
Copy link
Contributor Author

Implemented wait-for fix option: #14269

@richvdh
Copy link
Member

richvdh commented Oct 24, 2022

err, interesting. That doesn't sound good.

There's even more in federation sender instances DeviceFederationOutboxStreamChangeCache, pretty much a constant flow of these:

Checking entity in DeviceFederationOutboxStreamChangeCache stream cache cache with
    current position 42696140 ahead of the max stream cache position 42696092

This sounds very much like it could be the cause of #14251 and other device-list-tracking bugs we've been tearing our hair out over.

For those of us struggling to keep up at the back: are you able to explain the mechanics of this race? How does the "current position" come to be ahead of the "max stream cache position"?

I wasn't able to find where exactly you introduced these log lines - can you point me to a relevant diff?

@Fizzadar
Copy link
Contributor Author

Diff for the logging code: #14239

Essentially it seems to boil down to:

  • Something gets written to the database using a new token (get_next) out of an ID generator
  • At that point, get_current_token will immediately point at that token
  • But at that point the stream cache may not have yet been notified about the change, for one of two reasons:
    • no call to entity_has_changed is made after incrementing the token (only noticed this yesterday and made a provisional fix for events persisting in 9b5ef68), this would affect both workers and monlithic installs I believe
    • the replication lag time taken for the change to propagate from writer to other workers where they call entity_has_changed

Definitely seems likely this would trigger the device list issues as well.

@erikjohnston
Copy link
Member

At that point, get_current_token will immediately point at that token

get_current_token should only return the new token when we've finished persisting that token (and it gets more complicated with multi-writer). So, as long as we poke the stream change cache at the same time as calling get_next (or advance in replication case) this should be fine.

I think part of the confusion here is that there is not a one-to-one mapping between stream caches and and stream ID generators. For example, the device inbox ID generator feeds into both DeviceInboxStreamChangeCache and DeviceFederationOutboxStreamChangeCache:

async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
)
for user_id in local_messages_by_user_then_device.keys():
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
for destination in remote_messages_by_destination.keys():
self._device_federation_outbox_stream_cache.entity_has_changed(
destination, stream_id
)

So it's not unexpected if one xor the other stream change cache lags behind the current token.


Now, that's not to say there isn't a problem here. I'd be tempted for some of these stream caches to pass in the relevant stream ID generator, and then log if we are inserting a change with a lower stream ID than returned by get_current_token() (and maybe also get_current_token(instance_name) for multi writer streams). This may have some a false positive in process replication rows where we advance the ID gen and then update the stream change cache, e.g.:

self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
row.entity, token
)

@Fizzadar
Copy link
Contributor Author

I think I've been misunderstanding get_current_token - I was thinking it fetched it from the database but it's not it's the "known latest position" which is effectively the same as the waiting thing. Which makes this more confusing - how are we getting in a situation where the event ID generator has a token ahead of the events stream cache?


Re: device inbox I discovered this also where the two stream caches share one ID generator. My WIP PR #14286 addresses this by making sure both stream caches are always updated (to enable the waiting logic).

@Fizzadar
Copy link
Contributor Author

Fizzadar commented Oct 25, 2022

Digging into the events stream a bit, the only place the stream ID is advanced is:

def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[Any],
) -> None:
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)

And the change should be propagated at the same time here:

def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
data = row.data
if row.type == EventsStreamEventRow.TypeId:
assert isinstance(data, EventsStreamEventRow)
self._invalidate_caches_for_event(
token,
data.event_id,
data.room_id,
data.type,
data.state_key,
data.redacts,
data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
assert isinstance(data, EventsStreamCurrentStateRow)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Is it possible that we see a EventsStreamCurrentStateRow.TypeId row in a separate batch to EventsStreamEventRow.TypeId? Both of those would advance the token but only the first would flag the change in the stream cache at that token.

@Fizzadar
Copy link
Contributor Author

Should the entity_has_changed call just be moved into the EventsWorkerStore.process_replication_rows? It seems to me the token and change cache are deeply interlinked and keeping the code together would make sense (like DeviceInboxWorkerStore does).

@erikjohnston
Copy link
Member

Is it possible that we see a EventsStreamCurrentStateRow.TypeId row in a separate batch to EventsStreamEventRow.TypeId? Both of those would advance the token but only the first would flag the change in the stream cache at that token.

The code to decide what get sent down the event stream is at:

async def _update_function(
self,
instance_name: str,
from_token: Token,
current_token: Token,
target_row_count: int,
) -> StreamUpdateResult:
# the events stream merges together three separate sources:
# * new events
# * current_state changes
# * events which were previously outliers, but have now been de-outliered.
#
# The merge operation is complicated by the fact that we only have a single
# "stream token" which is supposed to indicate how far we have got through
# all three streams. It's therefore no good to return rows 1-1000 from the
# "new events" table if the state_deltas are limited to rows 1-100 by the
# target_row_count.
#
# In other words: we must pick a new upper limit, and must return *all* rows
# up to that point for each of the three sources.
#
# Start by trying to split the target_row_count up. We expect to have a
# negligible number of ex-outliers, and a rough approximation based on recent
# traffic on sw1v.org shows that there are approximately the same number of
# event rows between a given pair of stream ids as there are state
# updates, so let's split our target_row_count among those two types. The target
# is only an approximation - it doesn't matter if we end up going a bit over it.
target_row_count //= 2
# now we fetch up to that many rows from the events table
event_rows = await self._store.get_all_new_forward_event_rows(
instance_name, from_token, current_token, target_row_count
)
# we rely on get_all_new_forward_event_rows strictly honouring the limit, so
# that we know it is safe to just take upper_limit = event_rows[-1][0].
assert (
len(event_rows) <= target_row_count
), "get_all_new_forward_event_rows did not honour row limit"
# if we hit the limit on event_updates, there's no point in going beyond the
# last stream_id in the batch for the other sources.
if len(event_rows) == target_row_count:
limited = True
upper_limit: int = event_rows[-1][0]
else:
limited = False
upper_limit = current_token
# next up is the state delta table.
(
state_rows,
upper_limit,
state_rows_limited,
) = await self._store.get_all_updated_current_state_deltas(
instance_name, from_token, upper_limit, target_row_count
)
limited = limited or state_rows_limited
# finally, fetch the ex-outliers rows. We assume there are few enough of these
# not to bother with the limit.
ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
instance_name, from_token, upper_limit
)
# we now need to turn the raw database rows returned into tuples suitable
# for the replication protocol (basically, we add an identifier to
# distinguish the row type). At the same time, we can limit the event_rows
# to the max stream_id from state_rows.
event_updates: Iterable[Tuple[int, Tuple]] = (
(stream_id, (EventsStreamEventRow.TypeId, rest))
for (stream_id, *rest) in event_rows
if stream_id <= upper_limit
)
state_updates: Iterable[Tuple[int, Tuple]] = (
(stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
for (stream_id, *rest) in state_rows
)
ex_outliers_updates: Iterable[Tuple[int, Tuple]] = (
(stream_id, (EventsStreamEventRow.TypeId, rest))
for (stream_id, *rest) in ex_outliers_rows
)
# we need to return a sorted list, so merge them together.
updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
return updates, upper_limit, limited

Anything that that function returns with the same token will (well should) get batched up.

Should the entity_has_changed call just be moved into the EventsWorkerStore.process_replication_rows? It seems to me the token and change cache are deeply interlinked and keeping the code together would make sense (like DeviceInboxWorkerStore does).

Yes, that sounds like a good idea. I wonder if we should have a class that stores both the ID gen and the stream change cache, to make it more obvious that they are interlinked

@Fizzadar
Copy link
Contributor Author

Fizzadar commented Nov 1, 2022

I’m keen to work on this plan and should have some time over the next few weeks, feel free to assign to me :) There’s some precursor work I have identified as well involving cleanup of the slaved stores so will start there.

@Fizzadar
Copy link
Contributor Author

Fizzadar commented Nov 26, 2022

Small update - the new ReceiptsRoomChangeCache also triggers the same issue, I threw up a very quick debug change to our sync workers, looks like they are getting calls to process_replication_data with an empty row list (beeper/synapse@2610e25).

This is expected for POSITION commands... but as far as I can tell these are not sent for receipts, so how is there ever an empty list here!

UPDATE: this probably makes sense, seems to be on worker startup fetching rows and because receipts replace each other gaps would be expected. The above idea of combining stream change cache + ID generators would address this problem.

@Fizzadar
Copy link
Contributor Author

Fizzadar commented Dec 9, 2022

I think I may have figured out why this happens with the EventsRoomStreamChangeCache!

  • Sync workers call get_unread_event_push_actions_by_room_for_user for rooms
  • This executes check on the cache inside a transaction, and thus inside a thread
  • Python unlocks the GIL every Xms for threads, so it's possible this thread is switched to mid-way through the process_replication_rows calls
  • Because of class resolution order the EventsWorkerStore.process_replication_rows is called before CacheInvalidationWorkerStore.process_replication_rows which opens up this window

This explains why we're not seeing this particular log causing major issues and why it's so rare; I have pushed some logging to our fork to confirm the above is the case. Fixing this is relatively easy by making sure the stream cache is updated before the ID generator: #14648.

@erikjohnston
Copy link
Member

Fixing this is relatively easy by making sure the stream cache is updated before the ID generator

Should we add an assertion in the stream change cache where we assert that when we mark something as changed the stream ID is strictly greater than the current token?

@erikjohnston
Copy link
Member

Also, 😱

@Fizzadar
Copy link
Contributor Author

Fizzadar commented Dec 9, 2022

Should we add an assertion in the stream change cache where we assert that when we mark something as changed the stream ID is strictly greater than the current token?

Yes! That’d be ideal, will probably require passing the relevant id generator into the stream change cache but that seems sensible anyway…

@Fizzadar
Copy link
Contributor Author

Fizzadar commented Dec 9, 2022

I have added that to my PR, enabled only for the events stream, in 5ee0425. I think enabling for other streams is best left to a new PR?

@erikjohnston
Copy link
Member

@Fizzadar which PR?

@clokep
Copy link
Contributor

clokep commented Dec 9, 2022

@Fizzadar which PR?

Seem to be #14648

Fizzadar added a commit to beeper/synapse-legacy-fork that referenced this issue Dec 22, 2022
This creates a new store method, `process_replication_position` that
is called after `process_replication_rows`. By moving stream ID advances
here this guarantees any relevant cache invalidations will have been
applied before the stream is advanced.

This avoids race conditions where Python switches between threads mid
way through processing the `process_replication_rows` method where stream
IDs may be advanced before caches are invalidated due to class resolution
ordering.

See this comment/issue for further discussion:
	matrix-org#14158 (comment)
erikjohnston pushed a commit that referenced this issue Jan 4, 2023
This creates a new store method, `process_replication_position` that
is called after `process_replication_rows`. By moving stream ID advances
here this guarantees any relevant cache invalidations will have been
applied before the stream is advanced.

This avoids race conditions where Python switches between threads mid
way through processing the `process_replication_rows` method where stream
IDs may be advanced before caches are invalidated due to class resolution
ordering.

See this comment/issue for further discussion:
	#14158 (comment)
@richvdh
Copy link
Member

richvdh commented Jan 9, 2023

As I understand it, this has been partially addressed by #14723? Is anyone able to provide an update on what remains to be done here?

@Fizzadar
Copy link
Contributor Author

As I understand it, this has been partially addressed by #14723? Is anyone able to provide an update on what remains to be done here?

This should indeed resolve the issue once and for all - I'm waiting on rolling 1.75 to our deployment to confirm (as we have the logs indicating the issue) but I'm pretty sure that PR solves it. Will update here/close issue once that's live.

Fizzadar added a commit to beeper/synapse-legacy-fork that referenced this issue Jan 16, 2023
…4723)

This creates a new store method, `process_replication_position` that
is called after `process_replication_rows`. By moving stream ID advances
here this guarantees any relevant cache invalidations will have been
applied before the stream is advanced.

This avoids race conditions where Python switches between threads mid
way through processing the `process_replication_rows` method where stream
IDs may be advanced before caches are invalidated due to class resolution
ordering.

See this comment/issue for further discussion:
	matrix-org#14158 (comment)
# Conflicts:
#	synapse/storage/databases/main/devices.py
#	synapse/storage/databases/main/events_worker.py
Fizzadar added a commit to beeper/synapse-legacy-fork that referenced this issue Jan 17, 2023
…4723) (#52)

* Update all stream IDs after processing replication rows (matrix-org#14723)

This creates a new store method, `process_replication_position` that
is called after `process_replication_rows`. By moving stream ID advances
here this guarantees any relevant cache invalidations will have been
applied before the stream is advanced.

This avoids race conditions where Python switches between threads mid
way through processing the `process_replication_rows` method where stream
IDs may be advanced before caches are invalidated due to class resolution
ordering.

See this comment/issue for further discussion:
	matrix-org#14158 (comment)
# Conflicts:
#	synapse/storage/databases/main/devices.py
#	synapse/storage/databases/main/events_worker.py

* Fix bad cherry-picking

* Remove leftover stream advance
@Fizzadar
Copy link
Contributor Author

So this is still happening on our deployment but it did occur to me this morning - could this be a side effect of multiple event persisters, where one is running behind the others?

If I'm following the ReplicationCommandHandler._process_position code properly it looks like replication positions may be processed out of order. This would then mean changes are read/written to the change caches out of order from time to time?

Note: this probably means the assertion that changes are incremental only on cache update would blow up in these situations...

Does it make sense to force and order here and prevent out-of-order event persisters? Would mean any slow persister holds back sync/replication everywhere but they really shouldn't be that far behind each other...

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
A-Sync defects related to /sync A-Workers Problems related to running Synapse in Worker Mode (or replication) O-Uncommon Most users are unlikely to come across this or unexpected workflow S-Minor Blocks non-critical functionality, workarounds exist. T-Defect Bugs, crashes, hangs, security vulnerabilities, or other reported issues.
Projects
None yet
Development

No branches or pull requests

5 participants