eventstore: improve compression by reusing buffer and use a new compression level#3751
eventstore: improve compression by reusing buffer and use a new compression level#3751ti-chi-bot[bot] merged 13 commits intomasterfrom
Conversation
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request focuses on optimizing the performance of ZSTD compression and decompression within the event store. By implementing a strategy to reuse byte buffers for these operations, the changes aim to significantly reduce memory allocation overhead and improve the speed and efficiency of event processing, both when writing and reading compressed data. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request aims to improve compression and decompression speed by reusing buffers. The changes involve modifying writeEvents to accept a reusable compression buffer and updating eventStoreIter to reuse a decompression buffer. The implementation for buffer reuse in both compression and decompression appears correct and safe. The logic correctly handles buffer resizing and resetting for subsequent uses. The tests have been updated accordingly, and a new test has been added to verify the safety of buffer reuse in the iterator, which is a good addition. Overall, the changes are well-implemented and should achieve the intended performance improvement.
|
Is there a bench result? |
Working on it. |
bd474fc to
cce8ff5
Compare
📝 WalkthroughWalkthroughPer-worker compression buffering and buffer-reuse were added to the event store write path; writeEvents signature now accepts a compression buffer pointer. Decompression reuses a per-iterator buffer. A compression-ratio histogram metric was added. New wide_table integration test and CI token updates were introduced. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
rect rgba(200,200,255,0.5)
participant Worker
end
rect rgba(200,255,200,0.5)
participant WriteEvents
end
rect rgba(255,200,200,0.5)
participant ZstdEncoder
end
rect rgba(240,240,240,0.5)
participant PebbleDB
end
Worker->>WriteEvents: call writeEvents(events, encoder, &compressionBuf)
WriteEvents->>ZstdEncoder: encode value -> dstBuf (reuse/compress)
ZstdEncoder-->>WriteEvents: compressed bytes
WriteEvents->>PebbleDB: write key + compressed value
WriteEvents-->>Worker: callbacks / stats (compression bytes)
sequenceDiagram
autonumber
rect rgba(200,200,255,0.5)
participant Iterator
end
rect rgba(255,240,200,0.5)
participant PebbleDB
end
rect rgba(200,255,200,0.5)
participant ZstdDecoder
end
rect rgba(220,220,255,0.5)
participant Consumer
end
Consumer->>Iterator: Next()
Iterator->>PebbleDB: read key, value
PebbleDB-->>Iterator: value (maybe ZSTD)
Iterator->>ZstdDecoder: DecodeAll(value, dst=iter.decodeBuf)
ZstdDecoder-->>Iterator: decompressed bytes stored in iter.decodeBuf
Iterator-->>Consumer: yields decoded kv
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/integration_tests/wide_table/main.go`:
- Around line 69-76: Replace all uses of log.S() with structured pingcap/log
calls and zap fields: add the import "go.uber.org/zap" and change
log.S().Fatal(err) to log.Fatal("failed to create source database",
zap.Error(err)); change the deferred close error logging to log.Error("failed to
close source database", zap.Error(closeErr)); and similarly update the other
four instances to use log.Info/log.Error/log.Fatal with descriptive messages and
zap.Error(err) or other zap fields for context (e.g., table name, row count)
instead of formatted strings. Locate occurrences around the CreateDB/CloseDB
calls (util.CreateDB, util.CloseDB) and other logging sites reported and replace
formatted log.S() calls with the corresponding log.<Info|Error|Fatal>("message",
zap.Error(...), zap.String("key", val), ...) calls.
In `@tests/integration_tests/wide_table/run.sh`:
- Line 54: The call to the run function uses $* which flattens and can break
argument boundaries; change the invocation in run.sh to call run with "$@" so
the run function receives each original argument exactly (preserving spaces and
quoting). Locate the invocation of run (symbol: run) and replace the use of $*
with "$@" when forwarding script arguments.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
logservice/eventstore/event_store.gologservice/eventstore/event_store_test.gopkg/common/kv_entry.gopkg/metrics/event_store.gotests/integration_tests/run_light_it_in_ci.shtests/integration_tests/wide_table/conf/diff_config.tomltests/integration_tests/wide_table/main.gotests/integration_tests/wide_table/run.sh
| sourceDB, err := util.CreateDB(sourceConfig) | ||
| if err != nil { | ||
| log.S().Fatal(err) | ||
| } | ||
| defer func() { | ||
| if closeErr := util.CloseDB(sourceDB); closeErr != nil { | ||
| log.S().Errorf("failed to close source database: %s\n", closeErr) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
wc -l tests/integration_tests/wide_table/main.goRepository: pingcap/ticdc
Length of output: 103
🏁 Script executed:
cat -n tests/integration_tests/wide_table/main.go | sed -n '1,20p'Repository: pingcap/ticdc
Length of output: 776
🏁 Script executed:
cat -n tests/integration_tests/wide_table/main.go | sed -n '60,85p'Repository: pingcap/ticdc
Length of output: 918
🏁 Script executed:
cat -n tests/integration_tests/wide_table/main.go | sed -n '195,215p'Repository: pingcap/ticdc
Length of output: 857
🏁 Script executed:
rg "log\.S\(\)" tests/integration_tests/wide_table/main.go -nRepository: pingcap/ticdc
Length of output: 374
🏁 Script executed:
cat -n tests/integration_tests/wide_table/main.go | sed -n '16,35p'Repository: pingcap/ticdc
Length of output: 534
🏁 Script executed:
rg "errors\.(Trace|WrapError)" tests/integration_tests/wide_table/main.goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg "import" tests/integration_tests/wide_table/main.go -A 12Repository: pingcap/ticdc
Length of output: 242
🏁 Script executed:
rg "log\.(Info|Error|Fatal)" --type go -B 1 -A 1 | head -50Repository: pingcap/ticdc
Length of output: 2733
🏁 Script executed:
rg "go.uber.org/zap" tests/integration_tests/ -l | head -10Repository: pingcap/ticdc
Length of output: 542
🏁 Script executed:
cat -n tests/integration_tests/util/db.go | head -40Repository: pingcap/ticdc
Length of output: 1353
🏁 Script executed:
rg "log\.(Info|Error|Fatal|Warn)" tests/integration_tests/util/db.go -B 1 -A 1Repository: pingcap/ticdc
Length of output: 641
Switch to structured logging instead of log.S() usage.
The codebase uses structured logging via github.com/pingcap/log with zap fields. Replace log.S() calls with log.Info(), log.Error(), and log.Fatal() using zap fields for context. This applies to all 5 instances: lines 71, 75, 202, 205, and 207.
Add the go.uber.org/zap import and update the logging calls to pass structured fields instead of formatted strings.
♻️ Example conversions
import (
"database/sql"
"flag"
"fmt"
"strings"
"sync"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/tests/integration_tests/util"
+ "go.uber.org/zap"
)
sourceDB, err := util.CreateDB(sourceConfig)
if err != nil {
- log.S().Fatal(err)
+ log.Fatal("failed to create source database", zap.Error(err))
}
defer func() {
if closeErr := util.CloseDB(sourceDB); closeErr != nil {
- log.S().Errorf("failed to close source database: %s\n", closeErr)
+ log.Error("failed to close source database", zap.Error(closeErr))
}
}()
if err := row.Scan(&width); err != nil {
- log.S().Fatalf("failed to scan row width: %v", err)
+ log.Fatal("failed to scan row width", zap.Error(err))
}
if width < int64(minRowWidthBytes) {
- log.S().Fatalf("row %d width %d bytes is smaller than expected %d bytes", id, width, minRowWidthBytes)
+ log.Fatal("row width smaller than expected",
+ zap.Int("rowID", id),
+ zap.Int64("widthBytes", width),
+ zap.Int("minWidthBytes", minRowWidthBytes),
+ )
}
- log.S().Infof("row %d width %d bytes", id, width)
+ log.Info("row width measured", zap.Int("rowID", id), zap.Int64("widthBytes", width))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| sourceDB, err := util.CreateDB(sourceConfig) | |
| if err != nil { | |
| log.S().Fatal(err) | |
| } | |
| defer func() { | |
| if closeErr := util.CloseDB(sourceDB); closeErr != nil { | |
| log.S().Errorf("failed to close source database: %s\n", closeErr) | |
| } | |
| sourceDB, err := util.CreateDB(sourceConfig) | |
| if err != nil { | |
| log.Fatal("failed to create source database", zap.Error(err)) | |
| } | |
| defer func() { | |
| if closeErr := util.CloseDB(sourceDB); closeErr != nil { | |
| log.Error("failed to close source database", zap.Error(closeErr)) | |
| } | |
| }() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/integration_tests/wide_table/main.go` around lines 69 - 76, Replace all
uses of log.S() with structured pingcap/log calls and zap fields: add the import
"go.uber.org/zap" and change log.S().Fatal(err) to log.Fatal("failed to create
source database", zap.Error(err)); change the deferred close error logging to
log.Error("failed to close source database", zap.Error(closeErr)); and similarly
update the other four instances to use log.Info/log.Error/log.Fatal with
descriptive messages and zap.Error(err) or other zap fields for context (e.g.,
table name, row count) instead of formatted strings. Locate occurrences around
the CreateDB/CloseDB calls (util.CreateDB, util.CloseDB) and other logging sites
reported and replace formatted log.S() calls with the corresponding
log.<Info|Error|Fatal>("message", zap.Error(...), zap.String("key", val), ...)
calls.
| } | ||
|
|
||
| trap 'stop_tidb_cluster; collect_logs $WORK_DIR' EXIT | ||
| run $* |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n tests/integration_tests/wide_table/run.shRepository: pingcap/ticdc
Length of output: 2370
🏁 Script executed:
# Check what SINK_TYPE is used for in the run function
grep -n "SINK_TYPE\|SINK_URI" tests/integration_tests/wide_table/run.sh | head -20Repository: pingcap/ticdc
Length of output: 749
🏁 Script executed:
# Verify that the run function doesn't use any positional parameters
sed -n '11,51p' tests/integration_tests/wide_table/run.sh | grep -E '\$[0-9]|\$\*|\$@' || echo "No positional parameters found in run function"Repository: pingcap/ticdc
Length of output: 103
Use "$@" to preserve argument boundaries when calling run.
The run function doesn't declare parameters, so arguments could be omitted entirely (run instead of run "$@"). However, if arguments are passed, use "$@" instead of $* to preserve argument boundaries correctly.
✅ Suggested fix
-run $*
+run "$@"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| run $* | |
| run "$@" |
🧰 Tools
🪛 Shellcheck (0.11.0)
[warning] 54-54: Use "$@" (with quotes) to prevent whitespace problems.
(SC2048)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/integration_tests/wide_table/run.sh` at line 54, The call to the run
function uses $* which flattens and can break argument boundaries; change the
invocation in run.sh to call run with "$@" so the run function receives each
original argument exactly (preserving spaces and quoting). Locate the invocation
of run (symbol: run) and replace the use of $* with "$@" when forwarding script
arguments.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, asddongmen The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
There was a problem hiding this comment.
♻️ Duplicate comments (1)
tests/integration_tests/wide_table/run.sh (1)
53-55: Preserve argument boundaries when forwarding torun.Use
"$@"instead of$*to avoid word-splitting and preserve quoting if arguments are ever passed. (Line 54)✅ Suggested fix
-run $* +run "$@"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration_tests/wide_table/run.sh` around lines 53 - 55, The call to run is forwarding positional parameters using $*, which can break quoting and word boundaries; update the invocation in the script to use "$@" instead (preserving argument boundaries) — locate the trap and subsequent lines where stop_test, run, and check_logs reference WORK_DIR and replace the run $* call with run "$@".
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@tests/integration_tests/wide_table/run.sh`:
- Around line 53-55: The call to run is forwarding positional parameters using
$*, which can break quoting and word boundaries; update the invocation in the
script to use "$@" instead (preserving argument boundaries) — locate the trap
and subsequent lines where stop_test, run, and check_logs reference WORK_DIR and
replace the run $* call with run "$@".
What problem does this PR solve?
Issue Number: close #4041
What is changed and how it works?
This pull request focuses on optimizing the performance of ZSTD compression and decompression within the event store. By implementing a strategy to reuse byte buffers for these operations, the changes aim to significantly reduce memory allocation overhead and improve the speed and efficiency of event processing, both when writing and reading compressed data.
Highlights
writeEventsfunction now utilizes a reusable byte buffer for ZSTD compression operations. This change aims to reduce memory allocations and improve performance during event writing by avoiding repeated buffer creation.eventStoreIterhas been enhanced to reuse a byte buffer for ZSTD decompression. This optimization minimizes memory allocations during event iteration and reading, contributing to overall efficiency.TestEventStoreCompressionAndIterDecodeBufferReuse, has been added. This test specifically validates the correctness of the buffer reuse logic for both compression and decompression, ensuring data integrity and non-mutation.Check List
Tests
Manual test (add detailed scripts or steps below)
Here is a performance test result using
large_rowworkloadHere is a performance test using zstd heavy workload("repetitive structure + changing values" pattern)
Master: When input throughput is about 90MB/s, resolved ts lag keeps increasing.
This Pr: When input throughput is about 150MB/s, resolved ts lag is still steady.
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
New Features
Performance Improvements
Tests
Documentation