Fail page request if query mutable state returns missing events#9389
Fail page request if query mutable state returns missing events#9389
Conversation
…ransient wft being dropped, and to appendtransienttasks
yycptt
left a comment
There was a problem hiding this comment.
Do you plan to have a separate PR for improving the observability into recorded premature EOS workflow task failures?
| @@ -50,6 +50,7 @@ func appendTransientTasks( | |||
| // Check this FIRST before doing any work | |||
| clientName, _ := headers.GetClientNameAndVersion(ctx) | |||
| if clientName == headers.ClientNameCLI || clientName == headers.ClientNameUI { | |||
There was a problem hiding this comment.
Technically that's a breaking change since now we start to return non-durable events for normal get history calls from SDK.
I don't know how many people really cares but shall we have a feature flag for controlling if transient/speculative events are returned when they are not part of the continuation token? We can default it to true to fix the bug. But if needed we can turn it off if any customer complains.
I feel we need a better story here...
| // fetchGapEvents fetches events in [fromEventID, toEventID) from persistence and appends | ||
| // them to the current response (history or historyBlob). Used to close gaps that form | ||
| // when events are committed to DB between paginated GetWorkflowExecutionHistory calls. | ||
| fetchGapEvents := func(fromEventID, toEventID int64, branchToken []byte) error { |
There was a problem hiding this comment.
well, I guess technically this will cause the max page size specified in the request to be exceeded, but probably not a big deal... we probably should track this somewhere.
There was a problem hiding this comment.
I don't really follow the change you made here in a previous PR. Maybe we should find some time to chat, I think I am missing something here. :)
| // those events are committed to DB with IDs < continuationToken.NextEventId but were excluded | ||
| // because the DB fetch was capped at the original boundary. Fetch the gap now, then update | ||
| // the nextEventID boundary so appendTransientTasks validates against the correct ID. | ||
| _, _, _, freshNextEventID, freshIsRunning, freshVersionHistoryItem, freshVersionedTransition, freshTransientTasks, freshErr := |
There was a problem hiding this comment.
now that I think about it, I feel we can avoid doing this on last page.
What I am thinking is that on the first page, we already know the nextEventID and if there's transient/speculative events or not. I think the problem with the implementation on main is that the transient/speculative events info are in memory only so on later pages they can be gone. If we record them in the continuation token, then the problem can be solved as well and we don't need to load additional events from DB.
I don't want to block this PR because we need the fix, but I think this is something we can potentially improve.
| // 4: WorkflowTaskCompleted (ForceCreateNewWorkflowTask=true) | ||
| // 5: WorkflowTaskScheduled (force-created) | ||
| // 6: WorkflowTaskStarted |
There was a problem hiding this comment.
I think these 3 events would be in one batch
tests/premature_eos_test.go
Outdated
| // This drops the speculative WFT from memory, but the pending update survives in the | ||
| // update registry (no ShardFinalizerTimeout=0 override). On the next mutable state | ||
| // load the pending update will cause a normal WFT_SCHEDULED to be written as event 8. | ||
| // closeShard(s, tv.WorkflowID()) |
There was a problem hiding this comment.
Actually can remove it entirely, found this wasn't caused by shard closure
tests/premature_eos_test.go
Outdated
| // Simulate shard movement after the signal to clear mutable state again before page 2 | ||
| // is fetched. This ensures the gap-detection fix works even when the shard is reloaded | ||
| // between the signal and the subsequent GetWorkflowExecutionHistory call. | ||
| // closeShard(s, tv.WorkflowID()) |
…oralio#9389) ## What changed? Fix a race condition in GetWorkflowExecutionHistory that caused SDK workers to receive incomplete history and fail with "premature end of stream". On the last page of a paginated `GetWorkflowExecutionHistory` response, re-query mutable state to detect events that were committed to the DB between the first and last page fetches. If `freshNextEventId > continuationToken.NextEventId`, the gap is fetched from persistence and appended to the response before transient/speculative events are added. The continuation token is updated with the fresh boundary so `appendTransientTasks` validates against the correct `NextEventId`. If the re-query itself fails, the request returns an error so the client retries. Also adds a `nil` check in `ValidateTransientWorkflowTaskEvents`, preventing a possible nil-pointer dereference. ## Why? A speculative WFT is converted to normal (e.g., by an incoming signal), committing 1–2 new events. The continuation token from page 1 points to NextEventId=8; the DB range [6, 8) on page 2 returns only events 6–7, missing the newly-committed events 8 and 9. `appendTransientTasks` finds no transient tasks (speculative was committed), so the assembled history is missing 2 events. ## How did you test it? - [X] built - [X] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks - The re-query on the last history page adds one extra GetMutableState RPC per paginated `GetWorkflowExecutionHistory` call. This is bounded to final-page responses only and the existing path already made this call inside `appendTransientTasks`, so the net overhead is one additional call specifically when a gap is detected. - Returning an error when the fresh mutable-state re-query fails changes the previous behavior of silently continuing. Clients will retry, which is correct, but retry storms are possible if persistence is consistently unavailable mitigated by the client's existing backoff.
What changed?
Fix a race condition in GetWorkflowExecutionHistory that caused SDK workers to receive incomplete history and fail with "premature end of stream". On the last page of a paginated
GetWorkflowExecutionHistoryresponse, re-query mutable state to detect events that were committed to the DB between the first and last page fetches. IffreshNextEventId > continuationToken.NextEventId, the gap is fetched from persistence and appended to the response before transient/speculative events are added. The continuation token is updated with the fresh boundary soappendTransientTasksvalidates against the correctNextEventId. If the re-query itself fails, the request returns an error so the client retries.Also adds a
nilcheck inValidateTransientWorkflowTaskEvents, preventing a possible nil-pointer dereference.Why?
A speculative WFT is converted to normal (e.g., by an incoming signal), committing 1–2 new events. The continuation token from page 1 points to NextEventId=8; the DB range [6, 8) on page 2 returns only events 6–7, missing the newly-committed events 8 and 9.
appendTransientTasksfinds no transient tasks (speculative was committed), so the assembled history is missing 2 events.How did you test it?
Potential risks
GetWorkflowExecutionHistorycall. This is bounded to final-page responses only and the existing path already made this call insideappendTransientTasks, so the net overhead is one additional call specifically when a gap is detected.