Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR replaces the log puller's dynstream-based event delivery system with a new Span Pipeline architecture featuring a sharded, multi-worker pipeline manager that coordinates data and resolved-timestamp delivery across subscriptions with per-subscription state machines, quota-bounded in-flight data, and strict resolution barriers ensuring resolved-ts advances only after prior data is persisted. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Pipeline Manager
participant Pipeline Worker
participant Span State
participant EventStore
Client->>Pipeline Manager: EnqueueData(kvs)
Pipeline Manager->>Pipeline Manager: Acquire Quota (weight)
Pipeline Manager->>Pipeline Worker: Send Data Event
Pipeline Worker->>Span State: Advance nextDataSeq
Pipeline Worker->>Span State: Queue Persisted callback
Span State-->>Client: Finish Callback
Client->>Pipeline Manager: EnqueueResolved(ts)
Pipeline Manager->>Pipeline Worker: Send Resolved Event
Pipeline Worker->>Span State: Add Barrier (waitSeq, ts)
Pipeline Worker->>Span State: Check flushResolvedIfReady
alt All prior data acknowledged
Span State->>EventStore: Advance resolved-ts
else Waiting for persistence
Span State->>Span State: Hold barrier pending
end
Client->>Pipeline Manager: Persisted Callback
Pipeline Manager->>Pipeline Worker: Send Persisted Event
Pipeline Worker->>Span State: Update ackedSeq, add to doneSet
Pipeline Worker->>Pipeline Manager: Release Quota
Pipeline Worker->>Span State: flushResolvedIfReady (retry)
alt Now ready
Span State->>EventStore: Advance resolved-ts
end
sequenceDiagram
participant Region Request Worker
participant Region Event Processor
participant Processor Worker
participant Span Pipeline
Region Request Worker->>Region Event Processor: dispatch(regionEvent)
Region Event Processor->>Processor Worker: Shard by regionID to worker queue
Processor Worker->>Processor Worker: handle(regionEvent)
alt Event Type: Entries
Processor Worker->>Processor Worker: appendKVEntriesFromRegionEntries
Processor Worker->>Span Pipeline: EnqueueData(kvs)
else Event Type: ResolvedTs
Processor Worker->>Processor Worker: updateRegionResolvedTs
Processor Worker->>Processor Worker: maybeAdvanceSpanResolvedTs
Processor Worker->>Span Pipeline: EnqueueResolved(ts)
else Event Type: RegionError
Processor Worker->>Processor Worker: Handle error, reschedule if needed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the log puller's event delivery architecture by replacing the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
@lidezhu: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Code Review
This pull request introduces the Span Pipeline, a new delivery layer for the log puller, replacing the legacy dynstream implementation. Its aim is to enable true pipelining of data batches and resolved-ts signals for subscription spans, removing serialization bottlenecks and improving performance. A critical security concern is that invalid external input from TiKV can trigger process-wide crashes via log.Fatal or log.Panic. Additionally, some panic statements log raw database events which may contain sensitive information (PII) without redaction. It is recommended to handle protocol violations more gracefully and ensure all logged data is properly redacted. Other feedback includes potential quota leaks during shutdown, a risk of stalling background threads in the EventStore due to blocking callbacks, and a minor logic issue in resolved-ts advancement that could affect stale lock detection.
| func (m *spanPipelineManager) enqueue(subID SubscriptionID, ev spanPipelineEvent) bool { | ||
| if len(m.workers) == 0 { | ||
| return false | ||
| } | ||
| idx := int(uint64(subID) % uint64(len(m.workers))) | ||
| w := m.workers[idx] | ||
| select { | ||
| case <-m.ctx.Done(): | ||
| return false | ||
| case w.ch <- ev: | ||
| return true | ||
| } | ||
| } |
There was a problem hiding this comment.
The enqueue method blocks if the worker's channel is full. Since enqueuePersisted is called from a persistence callback (likely from an EventStore background thread), blocking here can stall critical background processes in the EventStore (e.g., Pebble's write pipeline). Consider using a non-blocking enqueue with a fallback mechanism or a separate prioritized control channel for persistence signals to ensure callbacks return promptly.
| resolvedTs := state.getLastResolvedTs() | ||
| if entry.CommitTs <= resolvedTs { | ||
| log.Fatal("The CommitTs must be greater than the resolvedTs", | ||
| zap.Int64("tableID", span.span.TableID), |
There was a problem hiding this comment.
The use of log.Fatal here can lead to a Denial of Service (DoS). If TiKV sends a COMMITTED event with a CommitTs less than or equal to the last resolvedTs, the entire TiCDC process will terminate. Since this data comes from an external component, it should be handled more gracefully, for example by stopping the affected changefeed instead of crashing the whole process.
| continue | ||
| } | ||
| log.Fatal("prewrite not match", | ||
| zap.Int64("tableID", span.span.TableID), |
| zap.String("EventType", "COMMIT"), | ||
| zap.Uint64("CommitTs", entry.CommitTs), | ||
| zap.Uint64("resolvedTs", resolvedTs), | ||
| zap.String("key", util.RedactKey(entry.GetKey()))) |
|
|
||
| func (p *regionEventProcessor) dispatch(event regionEvent) { | ||
| if len(event.states) == 0 { | ||
| log.Panic("region event has empty states", zap.Any("event", event)) |
There was a problem hiding this comment.
| return | ||
| } | ||
|
|
||
| log.Panic("unknown region event type", zap.Any("event", event)) |
| opType = common.OpTypePut | ||
| default: | ||
| log.Panic("meet unknown op type", zap.Any("entry", entry)) | ||
| } |
| } | ||
|
|
||
| func (event regionEvent) mustFirstState() *regionFeedState { | ||
| if len(event.states) == 0 || event.states[0] == nil { |
| func (m *spanPipelineManager) enqueuePersisted(subID SubscriptionID, seq uint64, weight int64) { | ||
| _ = m.enqueue(subID, spanPipelineEvent{ | ||
| subID: subID, | ||
| typ: spanPipelineEventPersisted, | ||
| seq: seq, | ||
| weight: weight, | ||
| }) | ||
| } |
There was a problem hiding this comment.
If m.enqueue returns false (e.g., because the manager's context is cancelled during shutdown), the persistence signal is lost and the associated quota weight is never released. This could lead to a quota leak if the manager or semaphore is reused, or simply inconsistent metrics during shutdown.
func (m *spanPipelineManager) enqueuePersisted(subID SubscriptionID, seq uint64, weight int64) {
if !m.enqueue(subID, spanPipelineEvent{
subID: subID,
typ: spanPipelineEventPersisted,
seq: seq,
weight: weight,
}) {
m.releaseQuota(weight)
}
}| if lastResolvedTs == span.startTs { | ||
| span.resolvedTsUpdated.Store(time.Now().Unix()) | ||
| return ts | ||
| } | ||
| return 0 | ||
| } |
There was a problem hiding this comment.
When ts == lastResolvedTs == span.startTs, the code repeatedly returns ts and updates span.resolvedTsUpdated. This behavior can hide a stuck region from the stale lock checker, as resolvedTsUpdated will keep being refreshed even though no progress is being made beyond the initial checkpoint. It should only return and update if it's the very first time the span is being initialized.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/subscription_client.go (1)
402-410:⚠️ Potential issue | 🟠 Major
Unregisteris synchronous/blocking, butonTableDrainedis called fromonRegionFailwhich must not block.
pipeline.Unregister(rt.subID)(line 406) creates a channel and blocks waiting on it to be closed after the worker processes the unregister event. The underlyingenqueuecall uses a select with no default case, so it will block if the worker channel is full. However,onTableDrainedcan be reached fromonRegionFail(line 418), which has an explicit comment on line 412: "don't block the caller, otherwise there may be deadlock".This blocking call in
onTableDrainedviolates the non-blocking guarantee on theonRegionFailpath, creating a potential deadlock risk when worker channels experience backpressure.Consider making the
Unregistercall inonTableDrainednon-blocking — or refactor to avoid calling blocking operations on paths that must remain non-blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 402 - 410, The call to pipeline.Unregister in subscriptionClient.onTableDrained can block and thus violate the non-blocking requirement of the onRegionFail path; make the Unregister non-blocking by invoking s.pipeline.Unregister(rt.subID) in a separate goroutine (e.g., go func(id uint64){ s.pipeline.Unregister(id) }(rt.subID)) so onTableDrained returns immediately, and keep the rest of the method (s.totalSpans.Lock/Unlock and delete) intact; alternatively, if you prefer a library change, update pipeline.Unregister to use a non-blocking enqueue (select with default) or accept a context/timeout and call it with a short timeout from onTableDrained to avoid blocking.
🧹 Nitpick comments (7)
logservice/logpuller/region_request_worker.go (1)
209-210: Stale comment: still references "ds" (dynstream).The comment says "dispatches them to ds" but dynstream has been removed. Update to reflect the new dispatch path.
✏️ Suggested fix
-// receiveAndDispatchChangeEventsToProcessor receives events from the grpc stream and dispatches them to ds. +// receiveAndDispatchChangeEvents receives events from the grpc stream and dispatches them to the region event processor.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_request_worker.go` around lines 209 - 210, The function comment for receiveAndDispatchChangeEvents (on type regionRequestWorker) still says "dispatches them to ds" which is stale; update the comment to reflect the current dispatch target (e.g., "dispatches them to the processor" or the actual component now used) so it no longer references dynstream/ds—edit the comment line above receiveAndDispatchChangeEvents to name the new dispatch path/component consistent with how events are forwarded in the function body.logservice/logpuller/region_event_handler_test.go (1)
136-142: Consider usingrequire.Failinstead ofrequire.True(t, false, ...).Multiple places (lines 139, 169, 201, 300, 305) use
require.True(t, false, msg)to signal failure.require.Fail(t, msg)orrequire.FailNow(t, msg)is more idiomatic and communicates intent more clearly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_event_handler_test.go` around lines 136 - 142, Replace the anti-idiomatic assertions that use require.True(t, false, ...) with require.Fail or require.FailNow to clearly signal test failures; locate the occurrences in region_event_handler_test.go where the select on eventCh uses require.True(t, false, ...) (and the other similar asserts referencing require.True) and change them to require.Fail(t, "unexpected event received") or require.FailNow(t, "unexpected event received") depending on whether you want the test to continue, ensuring the failure message describes the unexpected event.logservice/logpuller/region_event_processor.go (2)
77-84: Worker goroutines are fire-and-forget — noWaitGrouporerrgroupfor lifecycle tracking.Workers are started with bare
go w.run()and rely solely onctx.Done()for shutdown. If the processor's owner needs to wait for clean shutdown (e.g., to ensure all in-flight events are drained), there's no mechanism for that.Consider adding a
sync.WaitGroupto track workers for graceful shutdown, or document that the caller must not depend on drain semantics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_event_processor.go` around lines 77 - 84, The worker goroutines in regionEventWorker are started with bare go w.run() and aren't tracked, so add lifecycle tracking: add a sync.WaitGroup field (e.g., wg) to the processor (or to regionEventWorker), call wg.Add(1) before starting each worker in the loop that creates regionEventWorker (referencing workerCount, p.workers), have run() call wg.Done() on exit (and continue to respect ctx.Done()), and expose a Shutdown/Wait method on the processor that calls wg.Wait() so callers can wait for all workers to drain; alternatively document explicitly that there is no drain guarantee if you choose not to add the WaitGroup.
159-172:EnqueueDatablocks the worker goroutine under quota pressure, coupling unrelated subscriptions.When
EnqueueDatablocks on the quota semaphore (line 171), all region events mapped to this worker (byregionID % workerCount) are stalled — including regions belonging to other subscriptions. This is a consequence of sharding byregionIDin the processor but applying a global quota in the pipeline.This is the designed behavior per the design doc, but operationally: a single slow-persisting subscription can delay resolved-ts progress for unrelated subscriptions whose regions happen to hash to the same worker.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_event_processor.go` around lines 159 - 172, The worker goroutine is being blocked when calling w.processor.pipeline.EnqueueData(span, kvs), stalling other regions mapped to the same worker; change the call so EnqueueData does not block the worker: capture local variables (span, kvs, state.region.ID/subscription id if needed) and offload the pipeline.EnqueueData invocation to a separate goroutine or a dedicated per-subscription goroutine/queue so the worker returns immediately after metricsEventCount.Add(...); ensure context propagation by passing w.processor.ctx (or a derived context) into the offloaded call and preserve error handling/metrics in that goroutine rather than blocking inside region_event_processor.go's handling of event.entries / mustFirstState / subscribedSpan.logservice/logpuller/subscription_client.go (1)
240-241: All-zero arguments rely on hidden defaults; consider making configuration explicit.Passing
0, 0, 0tonewSpanPipelineManagersilently falls through toruntime.GOMAXPROCS(0)workers, a 4096-element queue, and 1 GiB quota. Similarly fornewRegionEventProcessor. This makes tuning opaque and disconnects the subscription client's config struct from the pipeline's parameters.Consider wiring these through
SubscriptionClientConfigor at least adding a brief comment explaining the defaults.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 240 - 241, The constructor calls to newSpanPipelineManager(subClient.ctx, 0, 0, 0) and newRegionEventProcessor(subClient.ctx, subClient, subClient.pipeline, 0, 0) rely on implicit zero-argument defaults (GOMAXPROCS(0), large queue, 1GiB quota) which hides configuration; update SubscriptionClient to surface these parameters (e.g., add fields to SubscriptionClientConfig for workerCount, queueSize, memoryQuota) and thread those config values into the calls to newSpanPipelineManager and newRegionEventProcessor instead of passing zeros, and/or add a concise comment next to the calls documenting the exact defaults used if exposing config is not possible.logservice/logpuller/span_pipeline.go (2)
277-291: Re-registration preserves stale pipeline state (sequences, barriers, doneSet).If
handleRegisteris called for an already-registeredsubID, it only updates thespanpointer (line 288) but retains the oldnextDataSeq,ackedSeq,doneSet, andpendingResolved. If a subscription is ever re-registered (even accidentally), the stale sequence/barrier state will corrupt the ordering guarantees.Since
SubscriptionIDis atomically incremented, this path shouldn't be hit in normal operation. But for defensiveness, consider either resetting the state on re-register or panicking to surface the unexpected condition.♻️ Suggested defensive handling
func (w *spanPipelineWorker) handleRegister(ev spanPipelineEvent) { state := w.states[ev.subID] if state == nil { state = &spanPipelineState{ span: ev.span, nextDataSeq: 1, ackedSeq: 0, } w.states[ev.subID] = state metrics.LogPullerSpanPipelineActiveSubscriptions.Inc() } else { - state.span = ev.span + log.Warn("span pipeline re-registration for existing subID, resetting state", + zap.Uint64("subID", uint64(ev.subID))) + state.span = ev.span + state.nextDataSeq = 1 + state.ackedSeq = 0 + state.doneSet = nil + state.pendingResolved = state.pendingResolved[:0] + state.pendingResolvedHead = 0 } close(ev.doneCh) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/span_pipeline.go` around lines 277 - 291, handleRegister currently only updates the span for an existing subscription and preserves stale ordering state; update the handler in spanPipelineWorker::handleRegister to defensively reset the per-subscription spanPipelineState when ev.subID already exists (or alternatively panic to make the unexpected re-registration explicit). Specifically, when w.states[ev.subID] != nil, reinitialize that entry (spanPipelineState) so nextDataSeq, ackedSeq, doneSet, and pendingResolved are cleared and span is set to ev.span (or call panic with a clear message if you prefer failing fast) to avoid preserving stale sequencing/barrier state.
306-323: Quota and resolved-ts barrier hinge on the callback being invoked exactly once.If
consumeKVEventsreturnstrue(line 319:await) but thewakeCallbackclosure is never called (consumer bug, panic, or lost reference), the acquired quota (ev.weight) is leaked permanently and the subscription's resolved-ts is stuck becauseackedSeqwill never advance pastseq.Consider adding a safety net, such as a timeout or periodic audit of long-outstanding sequences, to detect and recover from dropped callbacks — especially important given this is a WIP change.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/span_pipeline.go` around lines 306 - 323, The callback from state.span.consumeKVEvents must be guarded so a lost/never-invoked wakeCallback doesn't leak quota or stall ack advancement; update spanPipelineWorker.handleData and the state tracking to record outstanding seq with a deadline/timestamp when await==true, and add a background auditor goroutine (e.g., on spanPipelineWorker) that periodically scans long-pending seqs and for timeouts invokes the same recovery path: releaseQuota(ev.weight) and call onPersisted(state, seq) (or otherwise simulate the wakeCallback) to advance ackedSeq; alternatively pass a cancellable context/timeout into consumeKVEvents if supported and treat a timeout as a failed callback, ensuring every path that sets await==true has a corresponding guaranteed eventual cleanup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/region_event_processor.go`:
- Around line 134-143: run() currently returns immediately on
w.processor.ctx.Done(), discarding buffered events and leaking quota; modify
run() in regionEventWorker so that when ctx.Done() is observed it drains w.ch
and calls w.handle(event) for each buffered event before returning (e.g., after
the ctx.Done() branch enter a non-blocking loop: repeatedly select { case event
:= <-w.ch: w.handle(event) default: return } ), ensuring all pending data events
are processed and their finishCallback executed.
In `@logservice/logpuller/span_pipeline.go`:
- Around line 102-133: The worker goroutines started in newSpanPipelineManager
via go w.run() can silently die on panic; update spanPipelineWorker.run to
include a top-level defer that recovers panics, logs the error (including stack)
and signals the manager (e.g., call a failure method on spanPipelineManager or
cancel the manager context) so enqueue won't block forever; alternatively,
construct workers inside an errgroup in newSpanPipelineManager and run w.run()
via the group's goroutine so worker errors propagate and trigger shutdown—ensure
the chosen approach references spanPipelineWorker.run, spanPipelineWorker,
newSpanPipelineManager and the manager's context/quit handling.
- Around line 167-191: The EnqueueData path can block forever when a single
batch weight (from approximateRawKVEntriesSize) exceeds the semaphore capacity
used by m.quota.Acquire; change EnqueueData to guard against oversized batches
by checking weight against the configured quotaBytes (or manager capacity), and
if weight > quotaBytes either split kvs into multiple smaller batches that each
have weight <= quotaBytes and enqueue them as separate spanPipelineEvent entries
(recomputing weight per chunk) or cap the chunk size to quotaBytes and loop
until all kvs are enqueued; also emit a warning log when an incoming batch is
near or above the quota threshold, and ensure any early returns still call
m.releaseQuota where appropriate and use the existing spanPipelineEvent,
m.quota.Acquire, m.releaseQuota, EnqueueData and approximateRawKVEntriesSize
symbols to locate the code to change.
---
Outside diff comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 402-410: The call to pipeline.Unregister in
subscriptionClient.onTableDrained can block and thus violate the non-blocking
requirement of the onRegionFail path; make the Unregister non-blocking by
invoking s.pipeline.Unregister(rt.subID) in a separate goroutine (e.g., go
func(id uint64){ s.pipeline.Unregister(id) }(rt.subID)) so onTableDrained
returns immediately, and keep the rest of the method (s.totalSpans.Lock/Unlock
and delete) intact; alternatively, if you prefer a library change, update
pipeline.Unregister to use a non-blocking enqueue (select with default) or
accept a context/timeout and call it with a short timeout from onTableDrained to
avoid blocking.
---
Nitpick comments:
In `@logservice/logpuller/region_event_handler_test.go`:
- Around line 136-142: Replace the anti-idiomatic assertions that use
require.True(t, false, ...) with require.Fail or require.FailNow to clearly
signal test failures; locate the occurrences in region_event_handler_test.go
where the select on eventCh uses require.True(t, false, ...) (and the other
similar asserts referencing require.True) and change them to require.Fail(t,
"unexpected event received") or require.FailNow(t, "unexpected event received")
depending on whether you want the test to continue, ensuring the failure message
describes the unexpected event.
In `@logservice/logpuller/region_event_processor.go`:
- Around line 77-84: The worker goroutines in regionEventWorker are started with
bare go w.run() and aren't tracked, so add lifecycle tracking: add a
sync.WaitGroup field (e.g., wg) to the processor (or to regionEventWorker), call
wg.Add(1) before starting each worker in the loop that creates regionEventWorker
(referencing workerCount, p.workers), have run() call wg.Done() on exit (and
continue to respect ctx.Done()), and expose a Shutdown/Wait method on the
processor that calls wg.Wait() so callers can wait for all workers to drain;
alternatively document explicitly that there is no drain guarantee if you choose
not to add the WaitGroup.
- Around line 159-172: The worker goroutine is being blocked when calling
w.processor.pipeline.EnqueueData(span, kvs), stalling other regions mapped to
the same worker; change the call so EnqueueData does not block the worker:
capture local variables (span, kvs, state.region.ID/subscription id if needed)
and offload the pipeline.EnqueueData invocation to a separate goroutine or a
dedicated per-subscription goroutine/queue so the worker returns immediately
after metricsEventCount.Add(...); ensure context propagation by passing
w.processor.ctx (or a derived context) into the offloaded call and preserve
error handling/metrics in that goroutine rather than blocking inside
region_event_processor.go's handling of event.entries / mustFirstState /
subscribedSpan.
In `@logservice/logpuller/region_request_worker.go`:
- Around line 209-210: The function comment for receiveAndDispatchChangeEvents
(on type regionRequestWorker) still says "dispatches them to ds" which is stale;
update the comment to reflect the current dispatch target (e.g., "dispatches
them to the processor" or the actual component now used) so it no longer
references dynstream/ds—edit the comment line above
receiveAndDispatchChangeEvents to name the new dispatch path/component
consistent with how events are forwarded in the function body.
In `@logservice/logpuller/span_pipeline.go`:
- Around line 277-291: handleRegister currently only updates the span for an
existing subscription and preserves stale ordering state; update the handler in
spanPipelineWorker::handleRegister to defensively reset the per-subscription
spanPipelineState when ev.subID already exists (or alternatively panic to make
the unexpected re-registration explicit). Specifically, when w.states[ev.subID]
!= nil, reinitialize that entry (spanPipelineState) so nextDataSeq, ackedSeq,
doneSet, and pendingResolved are cleared and span is set to ev.span (or call
panic with a clear message if you prefer failing fast) to avoid preserving stale
sequencing/barrier state.
- Around line 306-323: The callback from state.span.consumeKVEvents must be
guarded so a lost/never-invoked wakeCallback doesn't leak quota or stall ack
advancement; update spanPipelineWorker.handleData and the state tracking to
record outstanding seq with a deadline/timestamp when await==true, and add a
background auditor goroutine (e.g., on spanPipelineWorker) that periodically
scans long-pending seqs and for timeouts invokes the same recovery path:
releaseQuota(ev.weight) and call onPersisted(state, seq) (or otherwise simulate
the wakeCallback) to advance ackedSeq; alternatively pass a cancellable
context/timeout into consumeKVEvents if supported and treat a timeout as a
failed callback, ensuring every path that sets await==true has a corresponding
guaranteed eventual cleanup.
In `@logservice/logpuller/subscription_client.go`:
- Around line 240-241: The constructor calls to
newSpanPipelineManager(subClient.ctx, 0, 0, 0) and
newRegionEventProcessor(subClient.ctx, subClient, subClient.pipeline, 0, 0) rely
on implicit zero-argument defaults (GOMAXPROCS(0), large queue, 1GiB quota)
which hides configuration; update SubscriptionClient to surface these parameters
(e.g., add fields to SubscriptionClientConfig for workerCount, queueSize,
memoryQuota) and thread those config values into the calls to
newSpanPipelineManager and newRegionEventProcessor instead of passing zeros,
and/or add a concise comment next to the calls documenting the exact defaults
used if exposing config is not possible.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
docs/design/2026-02-23-logpuller-span-pipeline.mdlogservice/logpuller/region_event.gologservice/logpuller/region_event_handler.gologservice/logpuller/region_event_handler_test.gologservice/logpuller/region_event_metrics.gologservice/logpuller/region_event_processor.gologservice/logpuller/region_request_worker.gologservice/logpuller/span_pipeline.gologservice/logpuller/span_pipeline_test.gologservice/logpuller/subscription_client.gopkg/metrics/log_puller.go
💤 Files with no reviewable changes (1)
- logservice/logpuller/region_event_handler.go
| func (w *regionEventWorker) run() { | ||
| for { | ||
| select { | ||
| case <-w.processor.ctx.Done(): | ||
| return | ||
| case event := <-w.ch: | ||
| w.handle(event) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find the file and understand the context around the run() function
cat -n logservice/logpuller/region_event_processor.go | head -200Repository: pingcap/ticdc
Length of output: 6359
🏁 Script executed:
# Search for Unregister method in the codebase
rg -A 15 "func.*Unregister" logservice/Repository: pingcap/ticdc
Length of output: 7210
🏁 Script executed:
# Look for quota-related code and finishCallback
rg -B 3 -A 3 "finishCallback" logservice/logpuller/Repository: pingcap/ticdc
Length of output: 1880
🏁 Script executed:
# Look for the EnqueueData implementation to understand quota handling
rg -A 20 "func.*EnqueueData" logservice/logpuller/span_pipeline.goRepository: pingcap/ticdc
Length of output: 690
🏁 Script executed:
# Search for quota-related code and any cleanup on shutdown
rg -B 5 -A 10 "quota" logservice/logpuller/ | head -100Repository: pingcap/ticdc
Length of output: 5645
🏁 Script executed:
# Check how the processor is shut down and if there's any graceful shutdown logic
rg -B 5 -A 10 "regionEventProcessor" logservice/logpuller/ | grep -A 10 -B 10 "ctx.Done\|shutdown\|close"Repository: pingcap/ticdc
Length of output: 1259
🏁 Script executed:
# Look for spanPipelineWorker shutdown/cleanup logic
rg -B 5 -A 30 "func.*spanPipelineWorker.*run" logservice/logpuller/span_pipeline.goRepository: pingcap/ticdc
Length of output: 802
🏁 Script executed:
# Check handleData implementation in spanPipelineWorker
rg -A 30 "func.*spanPipelineWorker.*handleData" logservice/logpuller/span_pipeline.goRepository: pingcap/ticdc
Length of output: 840
🏁 Script executed:
# Check what happens when finishCallback is called
rg -B 5 -A 10 "finishCallback\(" logservice/logpuller/span_pipeline.goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check if there's any quota cleanup on shutdown or unregister
rg -B 5 -A 15 "handleUnregister" logservice/logpuller/span_pipeline.goRepository: pingcap/ticdc
Length of output: 1236
🏁 Script executed:
# Check if there's any graceful shutdown mechanism for the processors
rg -B 5 -A 10 "regionEventProcessor\|spanPipelineManager" logservice/logpuller/ | grep -B 5 -A 10 "Close\|Stop\|Shutdown"Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check if handleUnregister releases any in-flight quota for pending data events
rg -B 10 -A 20 "func.*handleUnregister" logservice/logpuller/span_pipeline.goRepository: pingcap/ticdc
Length of output: 823
🏁 Script executed:
# Look for any shutdown/cleanup hooks or graceful draining mechanisms
rg -B 5 -A 15 "func.*spanPipelineManager\)" logservice/logpuller/span_pipeline.go | grep -A 15 "func.*"Repository: pingcap/ticdc
Length of output: 2389
🏁 Script executed:
# Check the full span_pipeline.go to understand the event workflow and any cleanup
cat logservice/logpuller/span_pipeline.go | grep -n "type spanPipelineEvent\|eventType\|EventData\|EventPersisted" | head -20Repository: pingcap/ticdc
Length of output: 334
🏁 Script executed:
# Check if there's cleanup for in-flight events anywhere in the codebase
rg "drainChannel\|drain\|cleanup.*event\|flush.*queue" logservice/logpuller/Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check the full flow: how EnqueueData and handle interact
rg -B 5 -A 20 "func.*handle.*event regionEvent" logservice/logpuller/region_event_processor.goRepository: pingcap/ticdc
Length of output: 650
🏁 Script executed:
# Verify if there's any cleanup that happens when Unregister is called on the regionEventProcessor side
rg -B 10 -A 20 "regionEventProcessor\|regionEventWorker" logservice/logpuller/ | grep -B 10 -A 10 "Unregister\|cleanup\|close\|Stop"Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check where regionEventProcessor is used and if there's any shutdown sequence
fd -t f "\.go$" logservice/logpuller/ -exec grep -l "newRegionEventProcessor\|regionEventProcessor" {} \;Repository: pingcap/ticdc
Length of output: 296
🏁 Script executed:
# Look at logservice subscribers or main flow to understand shutdown order
rg -B 3 -A 10 "newRegionEventProcessor\|regionEventProcessor" logservice/logpuller/subscription_client.go | head -50Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Find files that reference regionEventProcessor
fd -t f "\.go$" logservice/logpuller/ | xargs grep -l "regionEventProcessor"Repository: pingcap/ticdc
Length of output: 147
🏁 Script executed:
# Check subscription_client.go for regionEventProcessor usage
cat -n logservice/logpuller/subscription_client.go | grep -B 10 -A 10 "regionEventProcessor" | head -80Repository: pingcap/ticdc
Length of output: 2157
🏁 Script executed:
# Search for where EnqueueData is called and if quota is properly managed
rg -B 5 -A 5 "EnqueueData" logservice/logpuller/region_event_processor.goRepository: pingcap/ticdc
Length of output: 787
🏁 Script executed:
# Look for any shutdown hooks or defer statements that clean up in-flight events
rg -B 5 -A 10 "defer\|close\|Stop\|Shutdown" logservice/logpuller/subscription_client.go | head -100Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check if subscriptionClient has any graceful shutdown mechanism that calls Unregister before cancelling ctx
rg -B 5 -A 20 "func.*subscriptionClient.*Close\|func.*subscriptionClient.*Stop\|func.*subscriptionClient.*Shutdown" logservice/logpuller/subscription_client.goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check what happens when subClient.cancel is called
rg -B 10 -A 15 "subClient.cancel\|s.cancel" logservice/logpuller/subscription_client.goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check all places where context cancellation happens
rg -B 5 -A 5 "cancel()" logservice/logpuller/subscription_client.goRepository: pingcap/ticdc
Length of output: 1084
🏁 Script executed:
# Check if there's a graceful shutdown that drains pending events before cancelling context
rg -B 10 -A 20 "func.*subscriptionClient" logservice/logpuller/subscription_client.go | grep -A 20 "Close"Repository: pingcap/ticdc
Length of output: 943
🏁 Script executed:
# Verify the exact sequence: check if EnqueueData can fail silently due to context being done
cat -n logservice/logpuller/span_pipeline.go | sed -n '160,195p'Repository: pingcap/ticdc
Length of output: 1155
🏁 Script executed:
# Check if there's any cleanup for in-flight data when the pipeline manager context is cancelled
rg -B 5 "ctx.Done()" logservice/logpuller/span_pipeline.go | head -40Repository: pingcap/ticdc
Length of output: 454
🏁 Script executed:
# Final verification: check if there's any cleanup hook that calls Unregister before cancel
rg -B 5 -A 15 "Unregister" logservice/logpuller/subscription_client.goRepository: pingcap/ticdc
Length of output: 796
run() silently discards buffered data events on context cancellation, causing quota leaks.
When ctx.Done() fires, the worker exits immediately without draining w.ch. Any buffered data events are lost without being processed by spanPipelineWorker.handleData(), preventing their finishCallback from triggering the persisted event that would release acquired quota. This occurs because subscriptionClient.Close() cancels the context directly without first calling Unregister for in-flight subscriptions—Unregister only happens asynchronously when a table is drained, not during shutdown.
For resolved-ts events this is benign (no quota acquired), but data events that acquired quota will leak the quota allocation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/region_event_processor.go` around lines 134 - 143, run()
currently returns immediately on w.processor.ctx.Done(), discarding buffered
events and leaking quota; modify run() in regionEventWorker so that when
ctx.Done() is observed it drains w.ch and calls w.handle(event) for each
buffered event before returning (e.g., after the ctx.Done() branch enter a
non-blocking loop: repeatedly select { case event := <-w.ch: w.handle(event)
default: return } ), ensuring all pending data events are processed and their
finishCallback executed.
| func newSpanPipelineManager( | ||
| ctx context.Context, workerCount int, queueSize int, quotaBytes int64, | ||
| ) *spanPipelineManager { | ||
| if workerCount <= 0 { | ||
| workerCount = runtime.GOMAXPROCS(0) | ||
| if workerCount <= 0 { | ||
| workerCount = 1 | ||
| } | ||
| } | ||
| if queueSize <= 0 { | ||
| queueSize = defaultSpanPipelineQueueSize | ||
| } | ||
| if quotaBytes <= 0 { | ||
| quotaBytes = defaultSpanPipelineQuotaBytes | ||
| } | ||
|
|
||
| mgr := &spanPipelineManager{ | ||
| ctx: ctx, | ||
| quota: semaphore.NewWeighted(quotaBytes), | ||
| workers: make([]*spanPipelineWorker, 0, workerCount), | ||
| } | ||
| for i := 0; i < workerCount; i++ { | ||
| w := &spanPipelineWorker{ | ||
| ch: make(chan spanPipelineEvent, queueSize), | ||
| states: make(map[SubscriptionID]*spanPipelineState), | ||
| mgr: mgr, | ||
| } | ||
| mgr.workers = append(mgr.workers, w) | ||
| go w.run() | ||
| } | ||
| return mgr | ||
| } |
There was a problem hiding this comment.
Worker goroutines are fire-and-forget with no panic recovery or lifecycle tracking.
Goroutines started with go w.run() (line 130) have no defer recover(), no errgroup membership, and no mechanism to signal failure. If a handler panics (e.g., a nil-pointer in consumeKVEvents), the worker silently dies, and every subscription hashed to that worker becomes permanently stuck — enqueue will block on the full/dead channel until the context is cancelled.
Consider:
- Adding a
deferwith panic recovery and logging inrun(), or - Starting workers inside an
errgroupso that a failed worker propagates its error and triggers a shutdown.
🛡️ Suggested panic recovery in the worker loop
func (w *spanPipelineWorker) run() {
+ defer func() {
+ if r := recover(); r != nil {
+ log.Error("span pipeline worker panicked",
+ zap.Any("recover", r),
+ zap.Stack("stack"))
+ }
+ }()
for {
select {
case <-w.mgr.ctx.Done():
return
case ev := <-w.ch:
w.handleEvent(ev)
}
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/span_pipeline.go` around lines 102 - 133, The worker
goroutines started in newSpanPipelineManager via go w.run() can silently die on
panic; update spanPipelineWorker.run to include a top-level defer that recovers
panics, logs the error (including stack) and signals the manager (e.g., call a
failure method on spanPipelineManager or cancel the manager context) so enqueue
won't block forever; alternatively, construct workers inside an errgroup in
newSpanPipelineManager and run w.run() via the group's goroutine so worker
errors propagate and trigger shutdown—ensure the chosen approach references
spanPipelineWorker.run, spanPipelineWorker, newSpanPipelineManager and the
manager's context/quit handling.
| func (m *spanPipelineManager) EnqueueData( | ||
| ctx context.Context, span *subscribedSpan, kvs []common.RawKVEntry, | ||
| ) { | ||
| if span == nil || len(kvs) == 0 { | ||
| return | ||
| } | ||
| weight := approximateRawKVEntriesSize(kvs) | ||
| start := time.Now() | ||
| err := m.quota.Acquire(ctx, weight) | ||
| metrics.LogPullerSpanPipelineQuotaAcquireDuration.Observe(time.Since(start).Seconds()) | ||
| if err != nil { | ||
| return | ||
| } | ||
| metrics.LogPullerSpanPipelineInflightBytes.Add(float64(weight)) | ||
| metrics.LogPullerSpanPipelineInflightBatches.Inc() | ||
|
|
||
| if !m.enqueue(span.subID, spanPipelineEvent{ | ||
| subID: span.subID, | ||
| typ: spanPipelineEventData, | ||
| kvs: kvs, | ||
| weight: weight, | ||
| }) { | ||
| m.releaseQuota(weight) | ||
| } | ||
| } |
There was a problem hiding this comment.
Acquire will block forever if a single batch's weight exceeds quotaBytes.
semaphore.Weighted.Acquire(ctx, n) blocks until n tokens are available. If weight exceeds the total semaphore capacity (default 1 GiB), the call can never succeed and will hang until ctx is cancelled. While unlikely with typical KV batches, there's no guard against it.
Consider capping weight at quotaBytes, or splitting oversized batches, or at minimum logging a warning when weight approaches the limit.
🛡️ Suggested guard
func (m *spanPipelineManager) EnqueueData(
ctx context.Context, span *subscribedSpan, kvs []common.RawKVEntry,
) {
if span == nil || len(kvs) == 0 {
return
}
weight := approximateRawKVEntriesSize(kvs)
+ // Cap weight to quota capacity to prevent permanent blocking.
+ if weight > defaultSpanPipelineQuotaBytes {
+ log.Warn("span pipeline data batch exceeds quota capacity, capping weight",
+ zap.Int64("weight", weight),
+ zap.Int64("quota", defaultSpanPipelineQuotaBytes))
+ weight = defaultSpanPipelineQuotaBytes
+ }
start := time.Now()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/span_pipeline.go` around lines 167 - 191, The
EnqueueData path can block forever when a single batch weight (from
approximateRawKVEntriesSize) exceeds the semaphore capacity used by
m.quota.Acquire; change EnqueueData to guard against oversized batches by
checking weight against the configured quotaBytes (or manager capacity), and if
weight > quotaBytes either split kvs into multiple smaller batches that each
have weight <= quotaBytes and enqueue them as separate spanPipelineEvent entries
(recomputing weight per chunk) or cap the chunk size to quotaBytes and loop
until all kvs are enqueued; also emit a warning log when an incoming batch is
near or above the quota threshold, and ensure any early returns still call
m.releaseQuota where appropriate and use the existing spanPipelineEvent,
m.quota.Acquire, m.releaseQuota, EnqueueData and approximateRawKVEntriesSize
symbols to locate the code to change.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
Documentation
Refactor