Skip to content

redo: configuration intialization set correctly when start the redo and reduce the complexity#4510

Merged
ti-chi-bot[bot] merged 22 commits intopingcap:masterfrom
3AceShowHand:read-redo-code
Mar 18, 2026
Merged

redo: configuration intialization set correctly when start the redo and reduce the complexity#4510
ti-chi-bot[bot] merged 22 commits intopingcap:masterfrom
3AceShowHand:read-redo-code

Conversation

@3AceShowHand
Copy link
Copy Markdown
Collaborator

@3AceShowHand 3AceShowHand commented Mar 17, 2026

What problem does this PR solve?

Issue Number: close #4512

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

`None`

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced configuration validation for redo log writers, with stricter storage URI and compression format checks.
    • Improved error handling during redo sink initialization with more granular error reporting.
  • Bug Fixes

    • Fixed compression option defaulting to "none" when unspecified.
    • Improved storage URI validation with better error messages.
  • Refactor

    • Restructured redo writer configuration system for better maintainability.
    • Updated metrics collection infrastructure.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Mar 17, 2026
@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request focuses on simplifying and improving the redo sink component. It refactors metric collection into a dedicated module, enhances error handling during sink creation, and streamlines event encoding logic. These changes aim to improve code maintainability, robustness, and clarity within the redo log writing mechanism.

Highlights

  • Metric Refactoring: The redoSinkMetrics struct was moved from metrics.go to a new file metrics_collector.go and renamed to metricCollector, centralizing metric collection logic.
  • Improved Error Handling: The New function in sink.go now returns an error, allowing for better error propagation during sink initialization. Error tracing was also updated to use context.Cause where appropriate.
  • Code Simplification: Removed redundant nil checks for redoLogEvent conversion and simplified channel communication in encoding_worker.go.
  • Logging Consistency: Removed the capture field from log messages in mem_log_writer.go for consistency.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/sink/redo/metrics.go
    • Removed the redoSinkMetrics struct and its associated functions.
  • downstreamadapter/sink/redo/metrics_collector.go
    • Added a new file containing the metricCollector struct and its methods for handling redo sink metrics.
  • downstreamadapter/sink/redo/sink.go
    • Updated the Sink struct to use mericCollector instead of metric.
    • Modified the New function signature to return (*Sink, error) and added error handling.
    • Adjusted observeDDLWrite and observeRowWrite calls to use the new mericCollector.
    • Refactored the Close method to remove nil checks for writers and added a duration log.
    • Updated sendMessages to use context.Cause(ctx) for error tracing.
  • downstreamadapter/sink/redo/sink_test.go
    • Updated calls to New function to handle the new error return.
    • Changed loop syntax from for i := 0; i < N; i++ to for i := range N in several test functions.
    • Corrected error assignment in TestRedoSinkError.
  • pkg/redo/writer/factory/factory.go
    • Wrapped storage parsing errors with errors.ErrStorageInitialize.
  • pkg/redo/writer/memory/encoding_worker.go
    • Removed a redundant nil check for rl in toPolymorphicRedoEvent.
    • Refactored AddEvent and runWorker to directly handle channel communication and context cancellation, removing input and output helper methods.
    • Updated error tracing to use context.Cause(ctx).
  • pkg/redo/writer/memory/mem_log_writer.go
    • Removed the capture field from log messages in writeEvents and asyncWriteEvents.
Activity
  • No human activity (comments, reviews, progress updates) was found in the provided context.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 17, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR refactors the redo log writer configuration system by introducing a new Config type with accessor methods, replacing the direct field access patterns of LogWriterConfig. The redo sink constructor is updated to return an error, and metrics collection is refactored from redoSinkMetrics to a new metricCollector implementation.

Changes

Cohort / File(s) Summary
Core Config Architecture
pkg/redo/writer/config.go, pkg/redo/writer/writer.go
Introduces new public Config type with accessor methods and NewConfig() constructor that validates and normalizes storage URIs. Removes old LogWriterConfig type. Updates RedoEvent and SetTableSchemaStore interfaces to use commonEvent types.
Redo Sink Refactoring
downstreamadapter/sink/redo/sink.go, downstreamadapter/sink/redo/sink_test.go, downstreamadapter/dispatchermanager/dispatcher_manager_redo.go
Updates sink constructor to return (*Sink, error), adds changefeedID field, replaces metric with metricCollector, wires config via writer.NewConfig(). Error handling improved in initialization path.
Metrics Refactoring
downstreamadapter/sink/redo/metrics.go (removed), downstreamadapter/sink/redo/metrics_collector.go (added)
Removes entire metrics.go with redoSinkMetrics struct and methods. Introduces new metricCollector with similar observation and cleanup capabilities for Prometheus metrics.
File Writer Updates
pkg/redo/writer/file/file.go, pkg/redo/writer/file/file_log_writer.go, pkg/redo/writer/file/file_test.go, pkg/redo/writer/file/file_log_writer_test.go, pkg/redo/writer/file/test_helper_test.go
Updates to use Config type instead of LogWriterConfig. Replaces direct field access with method calls (Dir(), ChangeFeedID(), etc.). Adds test helper functions for config and external storage creation.
Memory Writer Updates
pkg/redo/writer/memory/mem_log_writer.go, pkg/redo/writer/memory/encoding_worker.go, pkg/redo/writer/memory/file_worker.go, pkg/redo/writer/memory/mem_log_writer_test.go, pkg/redo/writer/memory/encoding_worker_test.go
Converts to use Config type with accessor methods. Updates worker group initialization and event handling; AddEvent and runWorker use context-based select patterns.
Writer Factory & Reader
pkg/redo/writer/factory/factory.go, pkg/redo/reader/file.go, pkg/redo/reader/reader_test.go
Factory function signature updated to accept Config instead of LogWriterConfig. Reader's sortAndWriteFile uses writer.NewConfig() for dynamic config construction.
Test Utilities & Config
pkg/redo/testutil/config.go, pkg/config/consistent.go, downstreamadapter/sink/redo/meta_test.go
Adds NewConsistentConfig() helper for test setup. Updates ConsistentConfig validation to default compression to "none" and restrict to valid values. Refactors meta tests to use new config helper.
Writer Tests
pkg/redo/writer/writer_test.go
Adds comprehensive tests for NewConfig() covering default initialization, external storage, file backend options, and error handling for invalid/unsupported storage schemes.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

approved, ok-to-test

Suggested reviewers

  • wk989898
  • hongyunyan

Poem

🐰 A config refactor hops through the redo,
New accessors dance where direct fields used to go,
MetricCollector gathers, the old sink takes a bow,
Writer.NewConfig builds the future now! 🌟

🚥 Pre-merge checks | ❌ 5

❌ Failed checks (2 warnings, 3 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning PR description is mostly empty boilerplate template with minimal actual details about changes. Issue #4512 'learn the redo sink' is vague and does not define clear coding requirements. Add detailed explanation of what was changed and why. Describe the Config refactoring, metrics changes, and simplifications introduced in the PR.
Docstring Coverage ⚠️ Warning Docstring coverage is 24.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Linked Issues check ❓ Inconclusive Linked issue #4512 lacks specific coding requirements or acceptance criteria. The PR contains substantial refactoring (Config type introduction, metrics restructuring, signature changes), but issue provides no detail to validate against. Update issue #4512 with specific requirements and acceptance criteria for the redo sink simplification, or provide more context in the PR description.
Out of Scope Changes check ❓ Inconclusive Changes include significant refactoring with new public types and functions (Config, NewConfig, metricCollector, test utilities) that extend beyond simple code simplification. Scope appears broader than stated objective. Clarify in PR description whether these architectural changes (Config abstraction, metrics refactoring) were intended as part of the 'simplify' objective or represent scope creep.
Title check ❓ Inconclusive The title partially describes aspects of the changes (configuration initialization and code complexity reduction) but contains a typo and is vague about the main scope. Fix the typo ('intialization' → 'initialization') and clarify which components are being refactored (e.g., 'redo: refactor writer configuration initialization and metric collection' would be more specific).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request simplifies the redo sink implementation by refactoring metrics handling, improving error propagation in the constructor, and inlining some helper functions. The changes make the code more straightforward. I've found a minor typo in a field name that should be corrected for better maintainability.

Comment thread downstreamadapter/sink/redo/sink.go Outdated
@ti-chi-bot ti-chi-bot Bot added release-note-none Denotes a PR that doesn't merit a release note. and removed do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Mar 17, 2026
@3AceShowHand
Copy link
Copy Markdown
Collaborator Author

/test all

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pkg/redo/writer/memory/encoding_worker.go (1)

50-51: ⚠️ Potential issue | 🔴 Critical

Missing nil check on rl will cause panic.

If event.ToRedoLog() returns nil, the subsequent access to rl.Type on line 51 will panic. According to the AI summary, a nil guard was removed during this refactor. This should be restored to prevent runtime panics and allow proper error propagation.

🐛 Proposed fix to restore nil check
 func toPolymorphicRedoEvent(
 	event writer.RedoEvent,
 	tableSchemaStore *commonEvent.TableSchemaStore,
 ) (*polymorphicRedoEvent, error) {
 	rl := event.ToRedoLog()
+	if rl == nil {
+		return nil, errors.ErrUnexpected.FastGenByArgs("redo log is nil")
+	}
 	if rl.Type == commonEvent.RedoLogTypeDDL {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/encoding_worker.go` around lines 50 - 51, The code
calls event.ToRedoLog() and immediately accesses rl.Type which will panic if
ToRedoLog returns nil; add a nil guard after rl := event.ToRedoLog() (e.g., if
rl == nil) and handle it consistently with surrounding logic—either return or
propagate an error from the current function or skip processing this event—so
you avoid dereferencing rl and preserve proper error propagation for ToRedoLog
failures; update any callers or error returns in the enclosing function to match
the chosen handling.
downstreamadapter/sink/redo/sink.go (1)

86-94: ⚠️ Potential issue | 🟠 Major

Close the first writer when the second one fails.

If the DDL writer succeeds and Line 86 fails to create the row writer, this returns immediately and abandons the already-created DDL writer. On the file-backend path, pkg/redo/writer/file/file.go:222-247 shows Close() is where allocator and metric state are released, so repeated init failures can leak resources.

♻️ Suggested cleanup on the second init failure path
  dmlWriter, err := factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoRowLogFileType)
  if err != nil {
+		if closeErr := ddlWriter.Close(); closeErr != nil && errors.Cause(closeErr) != context.Canceled {
+			log.Warn("failed to close ddl writer after row writer init failure",
+				zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()),
+				zap.String("changefeed", s.cfg.ChangeFeedID.Name()),
+				zap.Error(closeErr))
+		}
  		log.Error("redo: failed to create redo log writer",
  			zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()),
  			zap.String("changefeed", s.cfg.ChangeFeedID.Name()),
  			zap.Duration("duration", time.Since(start)),
  			zap.Error(err))
  		return nil, err
  	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 86 - 94, The second
NewRedoLogWriter call (creating dmlWriter with redo.RedoRowLogFileType) can fail
and currently returns without cleaning up the previously-created ddl writer;
update the error path so that when NewRedoLogWriter for the row writer returns
an error you call Close() on the already-created ddl writer (the variable used
for the DDL writer) before returning the error, ensuring any allocator/metric
state released in Close() is run; reference the existing variables/funcs
ddlWriter, dmlWriter, NewRedoLogWriter, Close(), s.ctx and s.cfg when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/redo/sink.go`:
- Line 128: The metric collector is being dereferenced unconditionally (e.g.,
s.mericCollector.observeDDLWrite) which breaks tests that construct a minimal
Sink without a collector; either guard each invocation with a nil-check (if
s.mericCollector != nil { s.mericCollector.observeDDLWrite(...) }) or initialize
s.mericCollector to a no-op implementation in the Sink constructor/zero-value so
observe* methods are safe to call; update all places where s.mericCollector is
used (observeDDLWrite and other observe* calls in sendMessages/related methods)
to use one of these two approaches to restore nil-safety.

In `@pkg/redo/writer/memory/encoding_worker.go`:
- Around line 157-162: The ctx.Done() select branch in the encoding worker (the
select that sends redoLogEvent to e.outputCh and reads from e.closed) returns
ctx.Err() directly; change it to wrap the context error consistently using
errors.Trace(context.Cause(ctx)) (matching the other usage), i.e., replace the
direct return of ctx.Err() in the ctx.Done() case with a wrapped error via
errors.Trace(context.Cause(ctx)) so stack traces are preserved for the code
paths around e.closed, e.outputCh and redoLogEvent.

---

Outside diff comments:
In `@downstreamadapter/sink/redo/sink.go`:
- Around line 86-94: The second NewRedoLogWriter call (creating dmlWriter with
redo.RedoRowLogFileType) can fail and currently returns without cleaning up the
previously-created ddl writer; update the error path so that when
NewRedoLogWriter for the row writer returns an error you call Close() on the
already-created ddl writer (the variable used for the DDL writer) before
returning the error, ensuring any allocator/metric state released in Close() is
run; reference the existing variables/funcs ddlWriter, dmlWriter,
NewRedoLogWriter, Close(), s.ctx and s.cfg when making the change.

In `@pkg/redo/writer/memory/encoding_worker.go`:
- Around line 50-51: The code calls event.ToRedoLog() and immediately accesses
rl.Type which will panic if ToRedoLog returns nil; add a nil guard after rl :=
event.ToRedoLog() (e.g., if rl == nil) and handle it consistently with
surrounding logic—either return or propagate an error from the current function
or skip processing this event—so you avoid dereferencing rl and preserve proper
error propagation for ToRedoLog failures; update any callers or error returns in
the enclosing function to match the chosen handling.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d7b9a901-138a-4c10-a3cb-28239876271a

📥 Commits

Reviewing files that changed from the base of the PR and between 5eb3dcd and 6f639fd.

📒 Files selected for processing (8)
  • downstreamadapter/dispatchermanager/dispatcher_manager_redo.go
  • downstreamadapter/sink/redo/metrics.go
  • downstreamadapter/sink/redo/metrics_collector.go
  • downstreamadapter/sink/redo/sink.go
  • downstreamadapter/sink/redo/sink_test.go
  • pkg/redo/writer/factory/factory.go
  • pkg/redo/writer/memory/encoding_worker.go
  • pkg/redo/writer/memory/mem_log_writer.go
💤 Files with no reviewable changes (1)
  • downstreamadapter/sink/redo/metrics.go

Comment thread downstreamadapter/sink/redo/sink.go Outdated
Comment thread pkg/redo/writer/memory/encoding_worker.go
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
downstreamadapter/sink/redo/sink.go (3)

86-94: ⚠️ Potential issue | 🟠 Major

Close the already-created DDL writer on DML writer init failure.

If Line 86 fails, ddlWriter from Line 77 stays open. That leaves a leaked writer in the partial-construction path.

♻️ Suggested fix
 	dmlWriter, err := factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoRowLogFileType)
 	if err != nil {
+		if closeErr := ddlWriter.Close(); closeErr != nil && errors.Cause(closeErr) != context.Canceled {
+			log.Error("redo sink fails to close ddl writer",
+				zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()),
+				zap.String("changefeed", s.cfg.ChangeFeedID.Name()),
+				zap.Error(closeErr))
+		}
 		log.Error("redo: failed to create redo log writer",
 			zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()),
 			zap.String("changefeed", s.cfg.ChangeFeedID.Name()),
 			zap.Duration("duration", time.Since(start)),
 			zap.Error(err))
 		return nil, err
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 86 - 94, When
NewRedoLogWriter for the DML writer (dmlWriter) fails, you must close the
previously created ddlWriter to avoid leaking the open writer; after the err
check in the dmlWriter init path, if ddlWriter != nil call its Close (or
Close(ctx) if that API exists) and handle/log any close error before returning
the original err. Update the block around factory.NewRedoLogWriter(s.ctx, s.cfg,
redo.RedoRowLogFileType) to ensure ddlWriter is closed on failure.

16-34: ⚠️ Potential issue | 🟡 Minor

Resolve import formatting before merge (CI currently red).

Pipeline reports gci import formatting wrote changes; make fmt failed. Please run formatter and commit the result.

As per coding guidelines, "**/*.go: Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 16 - 34, The import block
in downstreamadapter/sink/redo/sink.go is not formatted per project CI
(gci/gofumports); run the repository formatter and commit the changes: execute
make fmt (which runs gci, gofumports, shfmt and related checks) to reorder and
format imports and apply gofmt to the file (affecting the import block around
the import list that includes symbols like helper, commonEvent, redo, writer,
factory, chann, atomic); after formatting, verify no diffs remain and push the
formatted file.

77-84: ⚠️ Potential issue | 🟠 Major

Wrap writer/factory errors at source instead of returning raw errors.

Lines 84, 93, 126, and 213 return raw err. These call sites should attach stack trace once, then let upstream propagate.

🧩 Suggested fix
-		return nil, err
+		return nil, errors.Trace(err)
@@
-		return nil, err
+		return nil, errors.Trace(err)
@@
-			return err
+			return errors.Trace(err)
@@
-			return err
+			return errors.Trace(err)

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace; upstream callers should propagate wrapped errors without wrapping again".

Also applies to: 86-93, 123-127, 211-214

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 77 - 84, The
factory.NewRedoLogWriter calls (e.g., the call that assigns ddlWriter via
factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoDDLLogFileType)) and the other
sites that currently do `return nil, err` should wrap the incoming error with
the project’s trace wrapper before returning so a stack trace is attached once;
replace raw returns with the traced error (e.g., use errors.Trace(err) or
errors.WrapError(...) according to the repo helper) at each call site (the
NewRedoLogWriter returns for DDL/DML and the other places noted in the comment)
and leave upstream callers to propagate the wrapped error.
♻️ Duplicate comments (1)
downstreamadapter/sink/redo/sink.go (1)

97-97: ⚠️ Potential issue | 🔴 Critical

Fix metricCollector typo to restore build and metrics path.

Line 97 assigns s.mericCollector, but the struct field is metricCollector (Line 50). This breaks compilation and all metric observations/close calls in Lines 128, 186, and 215.

🔧 Minimal fix
-	s.mericCollector = newMetricCollector(changefeedID)
+	s.metricCollector = newMetricCollector(changefeedID)
@@
-		s.mericCollector.observeDDLWrite(time.Since(start))
+		s.metricCollector.observeDDLWrite(time.Since(start))
@@
-	s.mericCollector.close()
+	s.metricCollector.close()
@@
-		s.mericCollector.observeRowWrite(len(events), time.Since(start))
+		s.metricCollector.observeRowWrite(len(events), time.Since(start))

Also applies to: 128-128, 186-186, 215-215

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` at line 97, There is a typo: the struct
field is named metricCollector but the code assigns s.mericCollector; change the
assignment to use s.metricCollector = newMetricCollector(changefeedID) and
update all other references (s.mericCollector) to s.metricCollector (including
the metric observation/close sites that call the collector), ensuring you
reference the metricCollector field and the newMetricCollector factory
consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@downstreamadapter/sink/redo/sink.go`:
- Around line 86-94: When NewRedoLogWriter for the DML writer (dmlWriter) fails,
you must close the previously created ddlWriter to avoid leaking the open
writer; after the err check in the dmlWriter init path, if ddlWriter != nil call
its Close (or Close(ctx) if that API exists) and handle/log any close error
before returning the original err. Update the block around
factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoRowLogFileType) to ensure
ddlWriter is closed on failure.
- Around line 16-34: The import block in downstreamadapter/sink/redo/sink.go is
not formatted per project CI (gci/gofumports); run the repository formatter and
commit the changes: execute make fmt (which runs gci, gofumports, shfmt and
related checks) to reorder and format imports and apply gofmt to the file
(affecting the import block around the import list that includes symbols like
helper, commonEvent, redo, writer, factory, chann, atomic); after formatting,
verify no diffs remain and push the formatted file.
- Around line 77-84: The factory.NewRedoLogWriter calls (e.g., the call that
assigns ddlWriter via factory.NewRedoLogWriter(s.ctx, s.cfg,
redo.RedoDDLLogFileType)) and the other sites that currently do `return nil,
err` should wrap the incoming error with the project’s trace wrapper before
returning so a stack trace is attached once; replace raw returns with the traced
error (e.g., use errors.Trace(err) or errors.WrapError(...) according to the
repo helper) at each call site (the NewRedoLogWriter returns for DDL/DML and the
other places noted in the comment) and leave upstream callers to propagate the
wrapped error.

---

Duplicate comments:
In `@downstreamadapter/sink/redo/sink.go`:
- Line 97: There is a typo: the struct field is named metricCollector but the
code assigns s.mericCollector; change the assignment to use s.metricCollector =
newMetricCollector(changefeedID) and update all other references
(s.mericCollector) to s.metricCollector (including the metric observation/close
sites that call the collector), ensuring you reference the metricCollector field
and the newMetricCollector factory consistently.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 01d05201-a492-410c-9c89-c204d586de9f

📥 Commits

Reviewing files that changed from the base of the PR and between 6f639fd and bd4fc6b.

📒 Files selected for processing (1)
  • downstreamadapter/sink/redo/sink.go

Comment thread pkg/redo/writer/memory/encoding_worker.go
@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Mar 17, 2026
@ti-chi-bot ti-chi-bot Bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Mar 17, 2026
@ti-chi-bot ti-chi-bot Bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Mar 17, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Mar 17, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-03-17 08:52:28.30688406 +0000 UTC m=+260675.394541598: ☑️ agreed by wk989898.
  • 2026-03-17 12:04:25.451587724 +0000 UTC m=+272192.539245271: ☑️ agreed by asddongmen.

@3AceShowHand 3AceShowHand marked this pull request as ready for review March 18, 2026 08:11
@ti-chi-bot ti-chi-bot Bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Mar 18, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/sink/redo/sink.go (1)

118-132: ⚠️ Potential issue | 🟡 Minor

Missing nil-check for metricCollector at line 127.

Line 127 calls s.metricCollector.observeDDLWrite() unconditionally, while lines 224-226 guard observeRowWrite with a nil-check. For consistency and to support test scenarios that construct minimal Sink instances, add a nil-check here as well.

🛡️ Proposed fix for consistency
 		s.isNormal.Store(false)
 		return err
 	}
-	s.metricCollector.observeDDLWrite(time.Since(start))
+	if s.metricCollector != nil {
+		s.metricCollector.observeDDLWrite(time.Since(start))
+	}
 	log.Info("redo sink send DDL event", zap.Any("startTs", event.GetStartTs()), zap.Any("commitTs", event.GetCommitTs()),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 118 - 132, The
WriteBlockEvent method in Sink calls s.metricCollector.observeDDLWrite(...)
without checking for nil; update Sink.WriteBlockEvent to guard the metric call
the same way as observeRowWrite does elsewhere: after successful
ddlWriter.WriteEvents, only call
s.metricCollector.observeDDLWrite(time.Since(start)) if s.metricCollector != nil
(and keep s.isNormal.Store(false) behavior and logging unchanged), using the
existing s.ctx, s.ddlWriter.WriteEvents and e variables to locate the spot.
🧹 Nitpick comments (1)
pkg/redo/writer/config.go (1)

93-112: Logic is correct but could benefit from a clarifying comment.

The branching logic handles:

  1. Non-external storage (e.g., blackhole after FixLocalScheme) → returns uri.Path
  2. file:// scheme (including normalized local/nfs) → returns uri.Path
  3. External storage with file backend (e.g., S3 + local staging) → returns computed path under DataDir
  4. External storage without file backend → returns empty string

The overlap between cases 1 and 2 after FixLocalScheme normalization means case 1 effectively only handles blackhole storage. Consider adding a brief comment explaining the post-normalization flow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/config.go` around lines 93 - 112, Add a brief clarifying
comment at the top of newWriterDir describing the post-FixLocalScheme branching:
explain that UseExternalStorage() false now only occurs for "blackhole" (so
returns cfg.uri.Path), uri.Scheme == "file" (including normalized "local"/"nfs")
also returns cfg.uri.Path, external storage with cfg.useFileBackend uses the
computed path under
config.GetGlobalServerConfig().DataDir/config.DefaultRedoDir/changefeedID.Keyspace()/changefeedID.Name(),
and external storage without a file backend returns an empty string; reference
the symbols newWriterDir, Config, UseExternalStorage, cfg.uri.Path,
cfg.uri.Scheme, useFileBackend, config.GetGlobalServerConfig, DataDir,
DefaultRedoDir, and changefeedID.Keyspace()/Name() in the comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/redo/meta_test.go`:
- Around line 28-29: The import block in meta_test.go is mis-ordered (writertest
and misc imports) causing gci failure; reorder the imports into the canonical
groups and run the formatter: update the import section so standard library,
external modules, and internal packages are in the correct order (ensure symbols
like writertest and misc remain imported) and then run `make fmt` to apply gci
formatting and fix the pipeline error.

In `@pkg/redo/writer/file/file_test.go`:
- Around line 233-234: The test is building expected object names from global
state (config.GetGlobalServerConfig().AdvertiseAddr) which is racy; instead
derive the expected filename from the writer.Config used in the test fixture
(the instance passed to NewWriter or WriteFile call) and use that to construct
the fmt.Sprintf pattern passed to mockStorage.EXPECT().WriteFile; replace any
global-config references in the three expectations (the one at lines ~233,
~277-280, ~338-341) to use writer.Config.AdvertiseAddr (or the specific
capture-id helper on the writer.Config) so the test asserts against the actual
writer.Config instance, and switch any plain t.Fatal/asserts to testify/require
calls for deterministic failures.

In `@pkg/redo/writer/testutil/config.go`:
- Line 7: Update the truncated license header URL from "LICENSE-20" to the
correct "LICENSE-2.0" in the file header comment; locate the top-of-file license
block (the package header comment in config.go) and replace the malformed URL so
the Apache license link reads "http://www.apache.org/licenses/LICENSE-2.0".

---

Outside diff comments:
In `@downstreamadapter/sink/redo/sink.go`:
- Around line 118-132: The WriteBlockEvent method in Sink calls
s.metricCollector.observeDDLWrite(...) without checking for nil; update
Sink.WriteBlockEvent to guard the metric call the same way as observeRowWrite
does elsewhere: after successful ddlWriter.WriteEvents, only call
s.metricCollector.observeDDLWrite(time.Since(start)) if s.metricCollector != nil
(and keep s.isNormal.Store(false) behavior and logging unchanged), using the
existing s.ctx, s.ddlWriter.WriteEvents and e variables to locate the spot.

---

Nitpick comments:
In `@pkg/redo/writer/config.go`:
- Around line 93-112: Add a brief clarifying comment at the top of newWriterDir
describing the post-FixLocalScheme branching: explain that UseExternalStorage()
false now only occurs for "blackhole" (so returns cfg.uri.Path), uri.Scheme ==
"file" (including normalized "local"/"nfs") also returns cfg.uri.Path, external
storage with cfg.useFileBackend uses the computed path under
config.GetGlobalServerConfig().DataDir/config.DefaultRedoDir/changefeedID.Keyspace()/changefeedID.Name(),
and external storage without a file backend returns an empty string; reference
the symbols newWriterDir, Config, UseExternalStorage, cfg.uri.Path,
cfg.uri.Scheme, useFileBackend, config.GetGlobalServerConfig, DataDir,
DefaultRedoDir, and changefeedID.Keyspace()/Name() in the comment.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5ecc832b-39d2-49c0-b18e-61d9bf068d20

📥 Commits

Reviewing files that changed from the base of the PR and between 2e6a223 and d85442b.

📒 Files selected for processing (22)
  • downstreamadapter/dispatchermanager/dispatcher_manager_redo.go
  • downstreamadapter/sink/redo/meta_test.go
  • downstreamadapter/sink/redo/sink.go
  • downstreamadapter/sink/redo/sink_test.go
  • pkg/config/consistent.go
  • pkg/redo/reader/file.go
  • pkg/redo/reader/reader_test.go
  • pkg/redo/writer/config.go
  • pkg/redo/writer/factory/factory.go
  • pkg/redo/writer/file/file.go
  • pkg/redo/writer/file/file_log_writer.go
  • pkg/redo/writer/file/file_log_writer_test.go
  • pkg/redo/writer/file/file_test.go
  • pkg/redo/writer/file/test_helper_test.go
  • pkg/redo/writer/memory/encoding_worker.go
  • pkg/redo/writer/memory/encoding_worker_test.go
  • pkg/redo/writer/memory/file_worker.go
  • pkg/redo/writer/memory/mem_log_writer.go
  • pkg/redo/writer/memory/mem_log_writer_test.go
  • pkg/redo/writer/testutil/config.go
  • pkg/redo/writer/writer.go
  • pkg/redo/writer/writer_test.go
🚧 Files skipped from review as they are similar to previous changes (5)
  • pkg/redo/writer/memory/mem_log_writer.go
  • pkg/redo/writer/factory/factory.go
  • pkg/redo/writer/memory/encoding_worker_test.go
  • pkg/redo/writer/memory/mem_log_writer_test.go
  • pkg/redo/writer/file/file_log_writer_test.go

Comment thread downstreamadapter/sink/redo/meta_test.go Outdated
Comment thread pkg/redo/writer/file/file_test.go Outdated
Comment thread pkg/redo/writer/testutil/config.go Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/redo/sink_test.go`:
- Around line 213-216: The test constructs a config with an unsupported URI
("blackhole-invalid://") which now fails inside New via writer.NewConfig
validation; update the test (TestRedoSinkError) to either use a valid sink URI
(e.g., "blackhole://" or another supported scheme returned by
newTestConsistentConfig) or assert that New(...) returns an error (replace
require.NoError with require.Error and adjust subsequent expectations),
referencing newTestConsistentConfig and New (which calls writer.NewConfig) to
locate the fix.

In `@downstreamadapter/sink/redo/sink.go`:
- Around line 64-67: The call to writer.NewConfig(changefeedID, cfg) returns a
raw library error; wrap it immediately (e.g., return nil, errors.Trace(err) or
return nil, errors.WrapError(err, "creating writer config")) so the stack
context is preserved; apply the same change to the other library-call returns
flagged in this file (the other raw returns) so every third-party/library error
at the call site uses errors.Trace or errors.WrapError instead of returning err
directly.
- Around line 85-93: When NewRedoLogWriter for DML (dmlWriter) fails, the
previously-created ddlWriter must be closed to avoid leaking resources; update
the error path after calling factory.NewRedoLogWriter(ctx, config,
redo.RedoRowLogFileType) so that if err != nil you first close ddlWriter (call
its Close/Close(ctx) method consistent with its type) and optionally log any
close error, then return the original creation error; reference ddlWriter,
dmlWriter and factory.NewRedoLogWriter in your change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4911f6ce-2ee3-424b-b7a0-dab75e5a13bd

📥 Commits

Reviewing files that changed from the base of the PR and between d85442b and 66993ff.

📒 Files selected for processing (9)
  • downstreamadapter/sink/redo/meta_test.go
  • downstreamadapter/sink/redo/sink.go
  • downstreamadapter/sink/redo/sink_test.go
  • pkg/redo/reader/reader_test.go
  • pkg/redo/testutil/config.go
  • pkg/redo/writer/file/test_helper_test.go
  • pkg/redo/writer/memory/encoding_worker_test.go
  • pkg/redo/writer/memory/mem_log_writer_test.go
  • pkg/redo/writer/writer_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/redo/writer/memory/encoding_worker_test.go
  • pkg/redo/writer/file/test_helper_test.go

Comment thread downstreamadapter/sink/redo/sink_test.go
Comment thread downstreamadapter/sink/redo/sink.go
Comment thread downstreamadapter/sink/redo/sink.go Outdated
@3AceShowHand
Copy link
Copy Markdown
Collaborator Author

/test all

@3AceShowHand
Copy link
Copy Markdown
Collaborator Author

/test all

@3AceShowHand 3AceShowHand changed the title redo: simplify the code redo: configuration intialization set correctly when start the redo and reduce the complexity Mar 18, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Mar 18, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: asddongmen, flowbehappy, wk989898

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

learn the redo sink

4 participants