Skip to content

fix(logtail): stream-publish txn tails to remove batch-wide wait (#24325)#24326

Open
ck89119 wants to merge 8 commits intomatrixorigin:mainfrom
ck89119:fix-24325-logtail-batch-wait
Open

fix(logtail): stream-publish txn tails to remove batch-wide wait (#24325)#24326
ck89119 wants to merge 8 commits intomatrixorigin:mainfrom
ck89119:fix-24325-logtail-batch-wait

Conversation

@ck89119
Copy link
Copy Markdown
Contributor

@ck89119 ck89119 commented May 9, 2026

What type of PR is this?

  • BUG
  • API-change
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #24325

What this PR does / why we need it:

Background

PR #22475 refactored Manager.onTxnLogTails in
pkg/vm/engine/tae/logtail/mgr.go so that a batch of txns are
collected in parallel and then published after a batch-wide
collectWg.Wait():

for i, item := range items {
    mgr.collectWg.Add(1)
    mgr.collectPool.Submit(func() {
        defer mgr.collectWg.Done()
        txn.GetStore().WaitEvent(txnif.WalPreparing)
        ...
        state := txn.GetTxnState(true)
        mgr.orderedList[i] = txnTail
    })
}
mgr.collectWg.Wait()                 // ← blocks on slowest txn
for i := range items {
    mgr.generateLogtailWithTxn(...)  // serial push AFTER all ready
}

Batch size is 100. Under high-concurrency short-tx workloads (e.g.
sysbench oltp_delete t=32), the batch regularly fills up and a single
slow txn defers logtail publish for all others. CN observes this
through txnOperator.unlock → timestampWaiter.GetTimestamp waiting on
NotifyLatestCommitTS, which in RC isolation gates commit return,
inflating per-txn latency.

This causes issue #24325: sysbench oltp_delete t=32 standalone TPS
drops ~28% at the exact commit that merged #22475, and ~17% on main
HEAD vs 3.0-dev (which does not carry #22475).

Fix

Replace the batch-wide wait with per-slot buffered channels + a serial
publisher that reads slot 0, 1, 2, ... in order:

  • each submit goroutine signals its own chan *txnWithLogtails on completion
  • the publisher loops through slots in index order, <-ch per slot
  • a slow txn only blocks the publisher up to its slot; it does not
    delay the collection of later txns nor the publishing of earlier
    already-ready txns

All other #22475 changes (single logtailQueue, collectPool,
WaitEvent(WalPreparing) event) are preserved.

Monotonicity of previousSaveTS inside generateLogtailWithTxn is
preserved since publish still happens in index (PrepareTS) order.

Benchmark

sysbench oltp_delete.lua on standalone MO, Apple Silicon arm64,
GOMAXPROCS=10, 10 tables × 1M rows, threads=32, time=60s,
db-ps-mode=disable, skip_trx=on, 3 independent runs (fresh prepare +
cleanup + mo-service restart per run), mean of 30-60s steady window:

Version Steady TPS timestampWaiter per-txn
main before this PR 7970 585 μs
main + this fix 9402 334 μs
3.0-dev (no #22475) 9326 127 μs
8b3700a (commit before #22475) 10487 123 μs

TPS recovers +17.9% over current main and passes 3.0-dev. The
remaining tw gap vs 3.0-dev (334 μs → 127 μs) is secondary and does
not materially affect TPS at t=32; it can be pursued separately if
needed.

Ablation (confirming the hotspot is exactly this function)

Each candidate fix applied in isolation on main HEAD, 3-run mean:

Candidate fix Steady TPS tw per-txn
main baseline 7970 585 μs
parallelize LogEntryWriter.Finish() marshal 8283 592 μs
synchronous marshal in cmdmgr.ApplyTxnRecord 8280 599 μs
this PR (stream-publish in onTxnLogTails) 9402 334 μs

Only touching onTxnLogTails produces the recovery; marshal-path
rewrites do not, which rules out marshal deferral itself as the
regression source.

…rixorigin#24325)

In `Manager.onTxnLogTails`, `mgr.collectWg.Wait()` blocks until every
txn in the batch has completed `WaitEvent(WalPreparing) + CollectLogtail
+ GetTxnState`, after which logtails are pushed serially. Under t=32
oltp_delete the batch (size 100) frequently fills up, and a single slow
txn defers logtail publish for all 99 others, inflating the commit
latency that CN RC isolation observes through `timestampWaiter`.

Replace the batch-wide wait with per-slot buffered channels and a
serial publisher that reads slot 0, 1, 2, ... in order. Collection
still runs in parallel via `collectPool`; a slow txn only blocks the
publisher up to its slot, not the collection of later txns nor the
publishing of already-ready earlier txns.

Ordering invariant (monotonic `previousSaveTS` in
`generateLogtailWithTxn`) is preserved, because items are still
published in index order.

sysbench oltp_delete, 10 tables x 1M rows, threads=32, time=60s,
skip_trx=on, standalone, 3-run mean of 30-60s steady window:

| Version                 | Steady TPS | tw per-txn |
|-------------------------|-----------:|-----------:|
| main before this PR     |       7970 |     585 us |
| main + this fix         |       9402 |     334 us |
| 3.0-dev (no matrixorigin#22475)     |       9326 |     127 us |
| 8b3700a (before 22475)|      10487 |     123 us |
@ck89119 ck89119 requested a review from XuPeng-SH as a code owner May 9, 2026 18:02
@qodo-code-review
Copy link
Copy Markdown

ⓘ You've reached your Qodo monthly free-tier limit. Reviews pause until next month — upgrade your plan to continue now, or link your paid account if you already have one.

…rigin#24325)

Extract the parallel-collect + serial-publish orchestration in
`onTxnLogTails` into a pure helper `orderedCollectAndPublish`, so the
concurrency contract can be unit tested without standing up a full TAE
runtime.

Tests cover:
- empty / all-skip / interleaved skip
- happy-path strict ascending publish order
- reverse ready order: slot N-1 completes first, earlier slots sleep
  progressively longer; publisher must still emit 0, 1, ..., N-1
- nil collect result (rollback) skipped without breaking order
- synchronous submit path (degenerate scheduler)

`go test -race -count=2` passes. `orderedCollectAndPublish` reaches
100% line coverage. `onTxnLogTails` behavior is unchanged.
Review feedback on matrixorigin#24326: the previous revision treated rollback and
committed txns identically so closeCB would flow to the subscriber
layer, but CollectLogtail / TxnLogtailRespBuilder does not filter by
final txn state — it walks the store and emits pre-cleanup mutations
stamped with PrepareTS. Publishing those tails leaks rolled-back
changes to subscribed sessions.

Keep the rollback path out of publish:

- On TxnStateRollbacked, call closeCB() inline to release the batches
  CollectLogtail already built and return nil from the collect
  callback. The helper's nil-skip path then bypasses
  generateLogtailWithTxn entirely, so subscribers never see
  rolled-back data.
- Committed path is unchanged: publisher runs
  generateLogtailWithTxn, which forwards closeCB through
  callback.call so the subscriber layer consumes it after handling.

Add TestOrderedCollectAndPublish_RollbackReleasesCloseCBInCollect to
assert the contract onTxnLogTails now relies on: in a batch with a
rollback slot in the middle, closeCB fires once per slot (rollback:
inline; committed: via publish), publish receives only the committed
slots, and the index order is preserved across the skip.
…panic

Review and SCA feedback on matrixorigin#24326:

- molint rejects recover() outside its whitelist, so the panic guard
  in orderedCollectAndPublish failed the SCA test.
- The guard was also too narrow: it kept the publisher from blocking
  on the slot channel but did not account for per-txn state. If
  collect panicked before DoneEvent(TailCollecting) ran, the matching
  AddEvent(TailCollecting) in OnEndPrepareWAL never balanced and the
  owning txn's WaitWalAndTail blocked forever, hanging apply.
- If the panic happened after CollectLogtail() allocated batches but
  before the rollback branch released closeCB, those batches leaked.

Replace recover with pure defer-based cleanup:

- helper: `defer func() { ch <- v }()` — Go runs deferred funcs while
  a panic unwinds, so the publisher always receives a zero-value v
  (nil) and its nil-skip branch drops the slot. The panic keeps
  propagating out of the goroutine, where ants's top-level recover
  logs the stack. No new recover is introduced.
- collect callback:
  - `defer txn.GetStore().DoneEvent(txnif.TailCollecting)` so the
    event balances on every path (success / rollback / panic) and
    WaitWalAndTail can complete.
  - A `runCloseCB` flag plus a deferred closer release closeCB unless
    we hand it over to the publish path. Success flips the flag off;
    rollback and panic both leave it true, so batches are always
    released.

Tests: existing 10 Ordered* cases still pass under `-race -count=2`;
orderedCollectAndPublish retains 100% line coverage. `go vet
-vettool=molint` on this package is clean.
ck89119 and others added 2 commits May 10, 2026 15:24
Round 5 review feedback on matrixorigin#24326:

- High: a panic inside collect on a committed txn would be silently
  swallowed by ants's internal recover. The enclosing defers would
  still balance DoneEvent(TailCollecting) and release closeCB, so
  apply proceeded, but no logtail was ever published for that
  committed mutation. Subscribers miss the commit and observe stale
  state.
- Medium: the existing tests only exercise the generic
  orderedCollectAndPublish helper. The real per-txn cleanup contract
  (DoneEvent on every path, closeCB on every non-publish path,
  rollback-not-published) had no direct regression coverage.

Fixes:

1. Install `ants.WithPanicHandler(func(v any){ panic(v) })` on the
   collect pool, matching logservicedriver/driver.go:101. The
   pool recovers panics for worker reuse, the handler re-panics, and
   nothing else recovers, so the Go runtime terminates the process.
   This replaces the silent drop with fail-fast: a committed-but-
   unpublished tail cannot leak to subscribers. The slot's defers
   still fire during unwind so local state (DoneEvent, closeCB)
   cleans up before the crash logs.

2. Extract the per-slot body into `collectOneTxn(txn, collect)`. The
   collect dependency is now a small function signature
   (`txnLogtailCollector`) so tests can inject a stub without a full
   TAE runtime. onTxnLogTails becomes a 3-line adapter.

3. Add `mgr_test.go` regression tests that target the real contract:
   - Committed: returns non-nil, does NOT run closeCB locally,
     DoneEvent balances AddEvent.
   - Rollback: returns nil, runs closeCB exactly once, DoneEvent
     balances AddEvent.
   - Panic (unknown state): require.Panics asserts the panic
     propagates (so PanicHandler fires), and both defers still ran:
     closeCB count == 1, DoneEvent balanced AddEvent.

`go test -race -count=2` passes all 13 tests.
`go vet -vettool=molint ./pkg/vm/engine/tae/logtail/` is clean.
`orderedCollectAndPublish` and `collectOneTxn` each retain 100% line
coverage.
The previous revision (d808eda) deferred DoneEvent(TailCollecting)
so it fired after GetTxnState(true). But apply's WaitWalAndTail
blocks on TailCollecting before DoneApply flips the txn state, so
waiting on the state first deadlocks:

  apply goroutine:       WaitWalAndTail → waits TailCollecting
  collectOneTxn:         GetTxnState(true) → waits Committed

Each side holds the thing the other needs. In BVT this stalled CN
bootstrap past its context deadline and the process panicked; every
downstream BVT job then failed to reach :6001.

Call DoneEvent(TailCollecting) inline right after collect() returns,
restoring the c2b193d ordering. A doneTail flag plus a deferred
fallback still guarantees DoneEvent fires if collect() panics before
the inline call — so the AddEvent in OnEndPrepareWAL always balances
and apply never hangs, even on a bad build.

Also drop the now-unused Manager.orderedList / collectWg fields and
fakeAsyncTxn.prepTS / _padding fields flagged by static-check, and
fix the gofmt misalignment on fakeAsyncTxn.IsReplay.

Add two regression tests covering the invariants:
- TestCollectOneTxn_DoneEventFiresBeforeGetTxnState observes that
  tailCollecting is already balanced by the time GetTxnState runs.
- TestCollectOneTxn_PanicInCollect_StillFiresDoneEvent asserts the
  deferred fallback fires on panic before DoneEvent's inline call.

Verified locally: mo-service now bootstraps and answers
`show databases;`; race-enabled tests and make static-check are
clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working kind/enhancement size/L Denotes a PR that changes [500,999] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants