*: fix missing resource cleanup on initialization failure paths#4517
*: fix missing resource cleanup on initialization failure paths#4517ti-chi-bot[bot] merged 14 commits intopingcap:masterfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR adds defensive resource cleanup across multiple initialization paths: Kafka/Pulsar sinks, redo reader/writer, upstream manager, schema store, dispatcher reporting tweak, and server prepare. Changes introduce named error returns and deferred close calls to ensure created resources are released on failure. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the stability and resource management of the system by addressing various scenarios where resources might not be properly released, especially during component initialization failures. The changes introduce consistent cleanup patterns using Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request addresses missing resource cleanup on initialization failure paths across various components. The general approach is to use defer statements to ensure resources are released when errors occur during initialization, which is a solid pattern for improving robustness. Most changes are accompanied by corresponding tests, which is great.
I've found a regression in pkg/sink/kafka/sarama_factory.go where resource cleanup logic was incorrectly removed, potentially leading to resource leaks. I've added comments with suggestions to fix this. Other than that, the changes look good and significantly improve the project's resource management.
| if err != nil { | ||
| _ = client.Close() | ||
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | ||
| } |
There was a problem hiding this comment.
This change introduces a potential resource leak. If sarama.NewSyncProducerFromClient(client) fails, the client created earlier will not be closed. The original code correctly handled this by calling _ = client.Close(). Please restore this cleanup logic to prevent leaking the Sarama client.
| if err != nil { | |
| _ = client.Close() | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| } | |
| _ = client.Close() | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| if err != nil { | ||
| _ = client.Close() | ||
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | ||
| } |
There was a problem hiding this comment.
Similar to the SyncProducer case, this change introduces a potential resource leak for the AsyncProducer. If sarama.NewAsyncProducerFromClient(client) fails, the client will be leaked. The cleanup logic should be restored here as well.
| if err != nil { | |
| _ = client.Close() | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| } | |
| _ = client.Close() | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pkg/redo/reader/file.go (1)
120-176:⚠️ Potential issue | 🟠 MajorClose the sorted files you've already opened on the error path.
If one
os.OpenFilefails in the final loop, every handle already appended toretis leaked because the function returns immediately. That leaves FDs behind on repeated initialization failures.🧹 Suggested cleanup on failure
-func downLoadAndSortFiles(ctx context.Context, cfg *readerConfig) ([]io.ReadCloser, error) { +func downLoadAndSortFiles(ctx context.Context, cfg *readerConfig) (_ []io.ReadCloser, err error) { dir := cfg.dir // create temp dir in local storage err := os.MkdirAll(dir, redo.DefaultDirMode) if err != nil { return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } ... - ret := []io.ReadCloser{} + ret := make([]io.ReadCloser, 0, len(sortedFileNames)) + defer func() { + if err != nil { + for _, rc := range ret { + _ = rc.Close() + } + } + }() for _, sortedFileName := range sortedFileNames { path := filepath.Join(dir, sortedFileName) f, err := os.OpenFile(path, os.O_RDONLY, redo.DefaultFileMode) if err != nil { if os.IsNotExist(err) { continue } return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } ret = append(ret, f) } return ret, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/redo/reader/file.go` around lines 120 - 176, In downLoadAndSortFiles the final loop appends open file handles to ret but returns immediately on an os.OpenFile error, leaking the previously opened files; modify the error path so that before returning the wrapped error (cerror.WrapError) you close every io.ReadCloser already stored in ret (iterate over ret and call Close), and then return the error; ensure this cleanup happens for any non-NotExist error in the loop so all opened descriptors from the sortedFileNames loop are released.pkg/redo/writer/file/file.go (1)
225-256:⚠️ Potential issue | 🟠 MajorAlways close
w.file, even whenw.close()exits early.
w.close()returns beforew.file.Close()on several error paths (flush,Seek,Truncate,Rename, directorySync).Close()then marks the writer stopped, so a retry will short-circuit and the local file descriptor stays leaked permanently.🧹 Suggested cleanup pattern
+import "go.uber.org/multierr" ... -func (w *Writer) close(ctx context.Context) error { +func (w *Writer) close(ctx context.Context) (err error) { if w.file == nil { return nil } + defer func() { + if w.file != nil { + err = multierr.Append(err, errors.WrapError(errors.ErrRedoFileOp, w.file.Close())) + w.file = nil + } + }() if err := w.flush(); err != nil { return err } ... - err = w.file.Close() - w.file = nil - return errors.WrapError(errors.ErrRedoFileOp, err) + return nil }Also applies to: 344-402
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/redo/writer/file/file.go` around lines 225 - 256, The Writer.Close implementation can short-circuit and leak the underlying *os.File when w.close(ctx) returns early; ensure w.file is always closed regardless of w.close outcome by moving or adding a guarded defer that closes and nils w.file (check for nil) at the start of Close (before setting running=false or returning early), and apply the same guaranteed-close pattern to the other Close-related path referenced (the block around the other close implementation at lines ~344-402) so any early returns still call w.file.Close(); reference symbols: Writer.Close, w.close, w.file, w.running, and ensure file close happens before marking running false or short-circuiting IsRunning().
🧹 Nitpick comments (6)
pkg/upstream/manager.go (1)
89-91: Double-close is redundant but safe.When
initUpstreamFuncfails,initUpstream's defer already callsup.Close(). The explicitup.Close()here is redundant sinceClose()guards against double invocation via status check. While safe, consider removing line 90 if the defer ininitUpstreamis guaranteed to run on all error paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/upstream/manager.go` around lines 89 - 91, The explicit call to up.Close() inside the error branch after m.initUpstreamFunc is redundant because initUpstream's defer already closes the upstream; remove the redundant up.Close() line (the call on line invoking up.Close() after m.initUpstreamFunc failure) and rely on the defer in initUpstream to perform cleanup, but keep m.initUpstreamFunc, initUpstream, and up.Close referenced to verify the defer always runs on error paths before removing the duplicate close.server/server_prepare.go (1)
97-108: Good cleanup pattern for etcdCli, but consider documenting caller responsibility.The defer + nil assignment pattern correctly handles cleanup when
NewCDCEtcdClientfails. However, if errors occur after line 108 (ininitDir,setMemoryLimit, ornewEtcdSession), resources assigned to server fields (c.EtcdClient,c.regionCache) won't be cleaned up by this defer—they rely onserver.Close()being called.Since
Run()doesn't callClose()on initialization failure, callers must callClose()even whenRun()returns an error. Consider either:
- Adding a comment documenting this requirement, or
- Wrapping cleanup in
initialize()to callClose()on error💡 Optional: Add cleanup in initialize() on failure
// In server.go initialize() method: func (c *server) initialize(ctx context.Context) error { if err := c.prepare(ctx); err != nil { log.Error("server prepare failed", zap.Any("server", c.info), zap.Error(err)) + c.cleanupOnPrepareFailure() return errors.Trace(err) } // ... rest of method } +func (c *server) cleanupOnPrepareFailure() { + if c.regionCache != nil { + c.regionCache.Close() + } + if c.EtcdClient != nil { + _ = c.EtcdClient.GetEtcdClient().Close() + } + if c.pdAPIClient != nil { + c.pdAPIClient.Close() + } + if c.pdClient != nil { + c.pdClient.Close() + } +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/server_prepare.go` around lines 97 - 108, The prepare() defer pattern correctly closes etcdCli on immediate failures but leaves partially-initialized server fields (c.EtcdClient, c.regionCache, pdClient, pdAPIClient) alive if later initialization steps fail; update initialize() to call Close() (or a new helper cleanupOnPrepareFailure()) when prepare() returns an error so resources are released: add a short cleanup helper (e.g., cleanupOnPrepareFailure) that checks and closes c.regionCache, c.EtcdClient.GetEtcdClient(), c.pdAPIClient, c.pdClient, and call it from initialize() when prepare() fails (also document in Run/initialize that callers need not call Close() after failed initialization because cleanup is performed).logservice/schemastore/schema_store_test.go (1)
113-130: Consider adding asnapTs == gcTsboundary assertion.A second case for equality would make the GC boundary behavior explicit and prevent future semantic drift.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/schemastore/schema_store_test.go` around lines 113 - 130, Add an explicit equality boundary test: call pstorage.getAllPhysicalTables with snapTs equal to pstorage.gcTs (e.g., 100) inside TestGetAllPhysicalTablesReturnsSnapshotLostByGCError (or as a small new subcase) and assert it returns an error that equals cerror.ErrSnapshotLostByGC, contains the expected message mentioning the checkpoint-ts and GC safepoint, and does not include format artifacts; use pstorage.gcTs and pstorage.getAllPhysicalTables to locate the code to change.logservice/schemastore/persist_storage.go (1)
190-193: Do not silently discarddb.Close()errors on this failure path.The cleanup is correct, but
_ = db.Close()suppresses a third-party (Pebble) error, violating the guideline to wrap third-party errors witherrors.Trace(err). This can hide cleanup failures during initialization errors.♻️ Proposed fix
if gcSafePoint < gcTs { - _ = db.Close() + if closeErr := db.Close(); closeErr != nil { + log.Warn("close schema store db failed after gc safepoint check", + zap.Error(errors.Trace(closeErr))) + } return errors.New(fmt.Sprintf("gc safe point %d is smaller than gcTs %d on disk", gcSafePoint, gcTs)) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/schemastore/persist_storage.go` around lines 190 - 193, In the gcSafePoint < gcTs failure branch in persist_storage.go replace the silent `_ = db.Close()` with a proper close-check: call `err := db.Close()` and if `err != nil` return `errors.Trace(err)` (so Pebble errors are not discarded), then continue to return the original formatted error for the gc mismatch; reference `db.Close()`, `gcSafePoint`, and `gcTs` when making the change.downstreamadapter/sink/kafka/sink.go (1)
97-106: Closestatisticson constructor failure.
newWithComponentsnow owns the init-failure cleanup, but it still leavesmetrics.NewStatistics(...)alive when either producer constructor fails. The success path already treats this as a closeable resource via(*sink).Close, so repeated retries will keep stale metrics state around.♻️ Minimal fix
- statistics := metrics.NewStatistics(changefeedID, "sink") asyncProducer, err = comp.factory.AsyncProducer(ctx) if err != nil { return nil, err } syncProducer, err = comp.factory.SyncProducer(ctx) if err != nil { return nil, err } + statistics := metrics.NewStatistics(changefeedID, "sink") return &sink{Also applies to: 109-136
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/kafka/sink.go` around lines 97 - 106, The metrics.Statistics instance created via metrics.NewStatistics(...) (variable statistics) is not closed on constructor failure; update the error cleanup in newWithComponents to call statistics.Close() whenever err != nil alongside closing syncProducer, asyncProducer and comp.close(), and apply the same change to the other failure-return paths in newWithComponents (the branches handling producer construction failures) so the statistics resource is properly released just like the success path which is closed by (*sink).Close.downstreamadapter/sink/pulsar/sink.go (1)
114-123: Closestatisticswhen sink construction fails.
newWithComponentstill allocatesmetrics.NewStatistics(...)before the remaining fallible steps, but the new error-path defer never closes it. Since(*sink).Closedoes close this object, DML/DDL producer init errors will keep leaking per-changefeed metrics state on retries.♻️ Minimal fix
failpointCh := make(chan error, 1) - statistics := metrics.NewStatistics(changefeedID, "pulsar") dmlProducer, err = newDMLProducer(changefeedID, comp, failpointCh) if err != nil { return nil, err } ddlProducer, err = newDDLProducer(changefeedID, comp, sinkConfig) if err != nil { return nil, err } + statistics := metrics.NewStatistics(changefeedID, "pulsar") return &sink{Also applies to: 126-153
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/pulsar/sink.go` around lines 114 - 123, The constructor newWithComponent allocates metrics.NewStatistics(...) (statistics) before later fallible initializations but the error-path defer only closes ddlProducer, dmlProducer and comp; modify that defer in newWithComponent to also close the allocated statistics when err != nil so it doesn't leak on producer init failures—i.e., reference the local statistics variable and call its Close (or appropriate close method) in the same error branch as ddlProducer.close()/dmlProducer.close()/comp.close(); ensure the same fix is applied to the other identical defer block around lines 126-153.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/pulsar/helper.go`:
- Around line 78-83: The deferred error-path cleanup currently calls
pulsarComponent.close() but still returns the closed value, causing callers
(e.g., Verify in downstreamadapter/sink/pulsar/sink.go) to close the same
resources again; modify the defer in the function that returns (pulsarComponent
component, protocol config.Protocol, err error) so that after closing you zero
out the returned component (e.g., set pulsarComponent = component{} or
nil-equivalent) before returning, ensuring callers do not receive a component
whose Close/close will be invoked again.
In `@downstreamadapter/sink/redo/meta.go`:
- Around line 120-124: The deferred cleanup in initMeta relies on checking the
named error return (err != nil), which won't be set during panics so
m.extStorage can leak; change the defer to use an explicit success flag: declare
a boolean (e.g., success := false) before the defer, have the defer call
m.closeExtStorage() when success is false, and set success = true just before
returning successfully. Apply the same pattern to the other similar defer site
referenced (around the other init path) so both panic and error paths close
m.extStorage via m.closeExtStorage().
In `@pkg/redo/writer/memory/mem_log_writer.go`:
- Around line 38-41: Run and Close race on memLogWriter.storage and
memLogWriter.cancel (Close may close storage while Run's workers still use it);
protect all reads/writes of storage, cancel and done with a mutex on the
memLogWriter struct so startup and shutdown are serialized. In practice add a
mutex field to memLogWriter and: acquire it when setting cancel and storage in
Run and when reading/mutating them in Close (and any worker teardown paths
referenced around the other blocks), check cancel under the lock before
using/clearing it, and only close storage while holding the lock to prevent
concurrent use by workers.
---
Outside diff comments:
In `@pkg/redo/reader/file.go`:
- Around line 120-176: In downLoadAndSortFiles the final loop appends open file
handles to ret but returns immediately on an os.OpenFile error, leaking the
previously opened files; modify the error path so that before returning the
wrapped error (cerror.WrapError) you close every io.ReadCloser already stored in
ret (iterate over ret and call Close), and then return the error; ensure this
cleanup happens for any non-NotExist error in the loop so all opened descriptors
from the sortedFileNames loop are released.
In `@pkg/redo/writer/file/file.go`:
- Around line 225-256: The Writer.Close implementation can short-circuit and
leak the underlying *os.File when w.close(ctx) returns early; ensure w.file is
always closed regardless of w.close outcome by moving or adding a guarded defer
that closes and nils w.file (check for nil) at the start of Close (before
setting running=false or returning early), and apply the same guaranteed-close
pattern to the other Close-related path referenced (the block around the other
close implementation at lines ~344-402) so any early returns still call
w.file.Close(); reference symbols: Writer.Close, w.close, w.file, w.running, and
ensure file close happens before marking running false or short-circuiting
IsRunning().
---
Nitpick comments:
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 97-106: The metrics.Statistics instance created via
metrics.NewStatistics(...) (variable statistics) is not closed on constructor
failure; update the error cleanup in newWithComponents to call
statistics.Close() whenever err != nil alongside closing syncProducer,
asyncProducer and comp.close(), and apply the same change to the other
failure-return paths in newWithComponents (the branches handling producer
construction failures) so the statistics resource is properly released just like
the success path which is closed by (*sink).Close.
In `@downstreamadapter/sink/pulsar/sink.go`:
- Around line 114-123: The constructor newWithComponent allocates
metrics.NewStatistics(...) (statistics) before later fallible initializations
but the error-path defer only closes ddlProducer, dmlProducer and comp; modify
that defer in newWithComponent to also close the allocated statistics when err
!= nil so it doesn't leak on producer init failures—i.e., reference the local
statistics variable and call its Close (or appropriate close method) in the same
error branch as ddlProducer.close()/dmlProducer.close()/comp.close(); ensure the
same fix is applied to the other identical defer block around lines 126-153.
In `@logservice/schemastore/persist_storage.go`:
- Around line 190-193: In the gcSafePoint < gcTs failure branch in
persist_storage.go replace the silent `_ = db.Close()` with a proper
close-check: call `err := db.Close()` and if `err != nil` return
`errors.Trace(err)` (so Pebble errors are not discarded), then continue to
return the original formatted error for the gc mismatch; reference `db.Close()`,
`gcSafePoint`, and `gcTs` when making the change.
In `@logservice/schemastore/schema_store_test.go`:
- Around line 113-130: Add an explicit equality boundary test: call
pstorage.getAllPhysicalTables with snapTs equal to pstorage.gcTs (e.g., 100)
inside TestGetAllPhysicalTablesReturnsSnapshotLostByGCError (or as a small new
subcase) and assert it returns an error that equals cerror.ErrSnapshotLostByGC,
contains the expected message mentioning the checkpoint-ts and GC safepoint, and
does not include format artifacts; use pstorage.gcTs and
pstorage.getAllPhysicalTables to locate the code to change.
In `@pkg/upstream/manager.go`:
- Around line 89-91: The explicit call to up.Close() inside the error branch
after m.initUpstreamFunc is redundant because initUpstream's defer already
closes the upstream; remove the redundant up.Close() line (the call on line
invoking up.Close() after m.initUpstreamFunc failure) and rely on the defer in
initUpstream to perform cleanup, but keep m.initUpstreamFunc, initUpstream, and
up.Close referenced to verify the defer always runs on error paths before
removing the duplicate close.
In `@server/server_prepare.go`:
- Around line 97-108: The prepare() defer pattern correctly closes etcdCli on
immediate failures but leaves partially-initialized server fields (c.EtcdClient,
c.regionCache, pdClient, pdAPIClient) alive if later initialization steps fail;
update initialize() to call Close() (or a new helper cleanupOnPrepareFailure())
when prepare() returns an error so resources are released: add a short cleanup
helper (e.g., cleanupOnPrepareFailure) that checks and closes c.regionCache,
c.EtcdClient.GetEtcdClient(), c.pdAPIClient, c.pdClient, and call it from
initialize() when prepare() fails (also document in Run/initialize that callers
need not call Close() after failed initialization because cleanup is performed).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 998ac660-bb11-4089-8946-a04836f88f7b
📒 Files selected for processing (24)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/sink/kafka/sink.godownstreamadapter/sink/kafka/sink_test.godownstreamadapter/sink/pulsar/helper.godownstreamadapter/sink/pulsar/sink.godownstreamadapter/sink/pulsar/sink_test.godownstreamadapter/sink/redo/meta.godownstreamadapter/sink/redo/meta_test.gologservice/schemastore/persist_storage.gologservice/schemastore/schema_store_test.gopkg/redo/reader/file.gopkg/redo/reader/file_test.gopkg/redo/reader/reader.gopkg/redo/reader/reader_test.gopkg/redo/writer/file/file.gopkg/redo/writer/file/file_test.gopkg/redo/writer/memory/mem_log_writer.gopkg/redo/writer/memory/mem_log_writer_test.gopkg/sink/kafka/sarama_factory.gopkg/upstream/manager.gopkg/upstream/manager_test.gopkg/upstream/upstream.goserver/server.goserver/server_prepare.go
💤 Files with no reviewable changes (1)
- pkg/sink/kafka/sarama_factory.go
| ) (pulsarComponent component, protocol config.Protocol, err error) { | ||
| defer func() { | ||
| if err != nil { | ||
| pulsarComponent.close() | ||
| } | ||
| }() |
There was a problem hiding this comment.
Avoid returning an already-closed component on error.
This helper now owns error-path cleanup, but Verify in downstreamadapter/sink/pulsar/sink.go still unconditionally does defer comp.close(). Because component is returned by value, failures after client or topicManager is assigned will now forward Close() twice to the same resources.
♻️ One safe fix
defer func() {
if err != nil {
pulsarComponent.close()
+ pulsarComponent = component{}
}
}()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ) (pulsarComponent component, protocol config.Protocol, err error) { | |
| defer func() { | |
| if err != nil { | |
| pulsarComponent.close() | |
| } | |
| }() | |
| ) (pulsarComponent component, protocol config.Protocol, err error) { | |
| defer func() { | |
| if err != nil { | |
| pulsarComponent.close() | |
| pulsarComponent = component{} | |
| } | |
| }() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/sink/pulsar/helper.go` around lines 78 - 83, The deferred
error-path cleanup currently calls pulsarComponent.close() but still returns the
closed value, causing callers (e.g., Verify in
downstreamadapter/sink/pulsar/sink.go) to close the same resources again; modify
the defer in the function that returns (pulsarComponent component, protocol
config.Protocol, err error) so that after closing you zero out the returned
component (e.g., set pulsarComponent = component{} or nil-equivalent) before
returning, ensuring callers do not receive a component whose Close/close will be
invoked again.
| defer func() { | ||
| if err != nil { | ||
| m.closeExtStorage() | ||
| } | ||
| }() |
There was a problem hiding this comment.
The init-failure cleanup still misses panic paths.
initMeta can still panic later in this flow (for example at Line 246), and deferred functions run during panic unwinding with the named return err still unset. In that case m.extStorage is left open, so this leak survives whenever the panic is recovered upstream. Prefer a success flag over err != nil so both returned errors and panics tear the storage down.
♻️ Suggested fix
m.extStorage = extStorage
- defer func() {
- if err != nil {
- m.closeExtStorage()
- }
- }()
+ initialized := false
+ defer func() {
+ if !initialized {
+ m.closeExtStorage()
+ }
+ }()
@@
- return nil
+ initialized = true
+ return nilAlso applies to: 145-145
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/sink/redo/meta.go` around lines 120 - 124, The deferred
cleanup in initMeta relies on checking the named error return (err != nil),
which won't be set during panics so m.extStorage can leak; change the defer to
use an explicit success flag: declare a boolean (e.g., success := false) before
the defer, have the defer call m.closeExtStorage() when success is false, and
set success = true just before returning successfully. Apply the same pattern to
the other similar defer site referenced (around the other init path) so both
panic and error paths close m.extStorage via m.closeExtStorage().
| storage storage.ExternalStorage | ||
|
|
||
| cancel context.CancelFunc | ||
| done chan struct{} |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n pkg/redo/writer/memory/mem_log_writer.goRepository: pingcap/ticdc
Length of output: 5758
Serialize Run and Close before touching storage.
Run() publishes cancel and later clears storage, while Close() reads and mutates the same fields without synchronization. If shutdown races with startup, Close() can see cancel == nil, close l.storage, and return while Run() continues with workers operating on a closed handle.
Also applies to: 88-97, 157-166
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/redo/writer/memory/mem_log_writer.go` around lines 38 - 41, Run and Close
race on memLogWriter.storage and memLogWriter.cancel (Close may close storage
while Run's workers still use it); protect all reads/writes of storage, cancel
and done with a mutex on the memLogWriter struct so startup and shutdown are
serialized. In practice add a mutex field to memLogWriter and: acquire it when
setting cancel and storage in Run and when reading/mutating them in Close (and
any worker teardown paths referenced around the other blocks), check cancel
under the lock before using/clearing it, and only close storage while holding
the lock to prevent concurrent use by workers.
|
/test all |
| @@ -125,7 +125,6 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) | |||
|
|
|||
| p, err := sarama.NewSyncProducerFromClient(client) | |||
| if err != nil { | |||
There was a problem hiding this comment.
It looks this client.Close should not be removed. should be deferred if meet error
There was a problem hiding this comment.
Close the client in the upper-level function Producer.Close
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/integration_tests/bootstrap_retry_after_error/run.sh (1)
3-3: Addpipefailto shell strict-mode and harden pipeline error handling.Line 3 uses
set -eu, but the pipelines on lines 45 and 33 (curl | jqandcdc_cli_changefeed | grep) can silently fail withoutpipefail, causing curl/cdc_cli errors to be masked as empty values and retried indefinitely. While the retry logic checks fornulland empty strings, it cannot distinguish between a transient API failure and a genuinely missing value, making CI failures harder to diagnose.♻️ Proposed hardening
-set -eu +set -euo pipefail @@ function get_maintainer_addr() { local api_addr=$1 - curl -s "http://${api_addr}/api/v2/changefeeds/${CHANGEFEED_ID}?keyspace=$KEYSPACE_NAME" | jq -r '.maintainer_addr' + curl -fsS "http://${api_addr}/api/v2/changefeeds/${CHANGEFEED_ID}?keyspace=$KEYSPACE_NAME" | jq -r '.maintainer_addr' } @@ for ((i = 0; i < CHECK_RETRIES; i++)); do - maintainer_addr=$(get_maintainer_addr "$api_addr") + maintainer_addr=$(get_maintainer_addr "$api_addr" || true) if [ -n "$maintainer_addr" ] && [ "$maintainer_addr" != "null" ]; thenAlso applies to: lines 33, 45-53
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration_tests/bootstrap_retry_after_error/run.sh` at line 3, The script uses "set -eu" which still masks failures in pipelines; change that to enable pipefail (e.g., replace "set -eu" with "set -euo pipefail" or "set -eu -o pipefail") so pipeline failures (notably the "curl | jq" and "cdc_cli_changefeed | grep" pipelines) cause the script to exit instead of producing empty values and hiding errors; keep the existing retry logic but ensure the pipelines fail fast by enabling pipefail.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/integration_tests/bootstrap_retry_after_error/run.sh`:
- Line 127: The test assertion is checking for a non-existent log message;
update the assertion in run.sh so check_logs_contains looks for the actual panic
message emitted in maintainer_controller_bootstrap.go (the string "cant not
found the startTs from the bootstrap response") instead of 'all bootstrap
responses reported empty checkpointTs' — modify the invocation that uses
MAX_RETRIES, check_logs_contains, WORK_DIR and '$other_logsuffix' to assert the
actual panic text.
---
Nitpick comments:
In `@tests/integration_tests/bootstrap_retry_after_error/run.sh`:
- Line 3: The script uses "set -eu" which still masks failures in pipelines;
change that to enable pipefail (e.g., replace "set -eu" with "set -euo pipefail"
or "set -eu -o pipefail") so pipeline failures (notably the "curl | jq" and
"cdc_cli_changefeed | grep" pipelines) cause the script to exit instead of
producing empty values and hiding errors; keep the existing retry logic but
ensure the pipelines fail fast by enabling pipefail.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f0143054-642f-4b46-919d-51d97ff42616
📒 Files selected for processing (2)
server/server_prepare.gotests/integration_tests/bootstrap_retry_after_error/run.sh
🚧 Files skipped from review as they are similar to previous changes (1)
- server/server_prepare.go
Signed-off-by: wk989898 <nhsmwk@gmail.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/redo/writer/file/file.go (1)
181-191:⚠️ Potential issue | 🔴 Critical
NewFileWriterstill leaks external storage when construction fails later.If
redo.InitExternalStoragesucceeds butnewWriterreturns an error later (for example, on the directory validation /os.MkdirAllpath),extStorageis returned to nobody and never closed. The namederrreturn only helps if you pair it with a deferred cleanup.♻️ Proposed fix
func NewFileWriter( ctx context.Context, cfg *writer.Config, logType string, opts ...writer.Option, ) (w *Writer, err error) { var extStorage storage.ExternalStorage if cfg.UseExternalStorage() { extStorage, err = redo.InitExternalStorage(ctx, *cfg.URI()) if err != nil { return nil, err } + defer func() { + if err != nil && extStorage != nil { + extStorage.Close() + } + }() } return newWriter(cfg, logType, extStorage, opts...) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/redo/writer/file/file.go` around lines 181 - 191, NewFileWriter can leak extStorage when redo.InitExternalStorage succeeds but newWriter returns an error; after calling redo.InitExternalStorage in NewFileWriter, add a deferred cleanup that closes extStorage if the function will return a non-nil error (use the existing named return err). Concretely, immediately after setting extStorage (and before calling newWriter), register: defer func() { if err != nil && extStorage != nil { extStorage.Close() } }() so that any error from newWriter (or later validation like os.MkdirAll) triggers closing the ExternalStorage; keep references to extStorage, redo.InitExternalStorage, and newWriter when applying the change.
♻️ Duplicate comments (1)
pkg/redo/writer/memory/mem_log_writer.go (1)
37-40:⚠️ Potential issue | 🔴 CriticalRun and
Closestill race on lifecycle state.
l.cancel,l.done, andl.storageare shared acrossRunandClosewithout synchronization.Closecan still observecancel == nilwhileRunis starting, closel.storage, and return while the workers continue with the same underlying handle. Please serialize those state transitions with a mutex (or equivalent) before this lands.Also applies to: 76-85, 144-153
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/redo/writer/memory/mem_log_writer.go` around lines 37 - 40, Run and Close race on shared fields (cancel, done, storage) in the mem log writer; protect lifecycle transitions by adding a mutex to the memLogWriter and lock it around all accesses/updates to l.cancel, l.done, and l.storage in Run, Close, and any other lifecycle-manipulating methods so Close cannot close storage or return while Run is concurrently initializing or running; update checks/assignments in Run and Close to hold the mutex, perform state changes (create/assign cancel and done, open/close storage) while locked, and release before starting long-running worker goroutines.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@pkg/redo/writer/file/file.go`:
- Around line 181-191: NewFileWriter can leak extStorage when
redo.InitExternalStorage succeeds but newWriter returns an error; after calling
redo.InitExternalStorage in NewFileWriter, add a deferred cleanup that closes
extStorage if the function will return a non-nil error (use the existing named
return err). Concretely, immediately after setting extStorage (and before
calling newWriter), register: defer func() { if err != nil && extStorage != nil
{ extStorage.Close() } }() so that any error from newWriter (or later validation
like os.MkdirAll) triggers closing the ExternalStorage; keep references to
extStorage, redo.InitExternalStorage, and newWriter when applying the change.
---
Duplicate comments:
In `@pkg/redo/writer/memory/mem_log_writer.go`:
- Around line 37-40: Run and Close race on shared fields (cancel, done, storage)
in the mem log writer; protect lifecycle transitions by adding a mutex to the
memLogWriter and lock it around all accesses/updates to l.cancel, l.done, and
l.storage in Run, Close, and any other lifecycle-manipulating methods so Close
cannot close storage or return while Run is concurrently initializing or
running; update checks/assignments in Run and Close to hold the mutex, perform
state changes (create/assign cancel and done, open/close storage) while locked,
and release before starting long-running worker goroutines.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 00ddbcda-d1c9-40f4-968e-a1c79bb5896a
📒 Files selected for processing (6)
downstreamadapter/sink/redo/meta_test.gopkg/redo/reader/file.gopkg/redo/reader/reader_test.gopkg/redo/writer/file/file.gopkg/redo/writer/memory/mem_log_writer.gopkg/redo/writer/memory/mem_log_writer_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- pkg/redo/reader/file.go
- pkg/redo/writer/memory/mem_log_writer_test.go
- pkg/redo/reader/reader_test.go
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, wlwilliamx The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/hold |
|
/unhold |
|
/override pull-cdc-mysql-integration-light-next-gen |
|
@tenfyzhong: Overrode contexts on behalf of tenfyzhong: pull-cdc-mysql-integration-light-next-gen DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Signed-off-by: wk989898 <nhsmwk@gmail.com>
|
@wk989898: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #4516 close #4513
What is changed and how it works?
Implemented robust resource cleanup mechanisms across various components, ensuring that resources like Kafka producers, Pulsar clients, Redo log storage, and Upstream connections are properly closed even when initialization fails.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Bug Fixes
Tests