lightning/external: reduce repeated reads in global sort ingest#67382
lightning/external: reduce repeated reads in global sort ingest#67382joechenrh wants to merge 5 commits intopingcap:masterfrom
Conversation
…eadRangeFromProps - Replace seekPropsOffsets with getReadRangeFromProps that returns [2][]uint64 (start+end offsets) per key per file - Remove getFilesReadConcurrency from engine.go, move concurrency logic into cachedReader.open - Replace readOneFile with cachedReader pattern (sequentialReader + concurrentReader) for better file read reuse - Add rangeProperty.totalSize() helper Signed-off-by: Ruihao Chen <joechenrh@gmail.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Review Complete Findings: 5 issues ℹ️ Learn more details on Pantheon AI. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @joechenrh. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
📝 WalkthroughWalkthroughIntroduce a cachedReader abstraction (sequential vs concurrent), change read-range representation to per-file Start/End pairs ( Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller (LoadIngestData / Merge)
participant Cached as cachedReader
participant Seq as sequentialReader
participant Conc as concurrentReader
participant Store as objstore
Caller->>Cached: open(startOffset,endOffset)
Caller->>Cached: read(startKey,endKey)
alt mode == sequential
Cached->>Seq: reuse or open sequential reader
Seq->>Store: ReadDataInRange(startOffset,endOffset)
Store-->>Seq: bytes stream
Seq-->>Cached: KVs (filtered by keys)
else mode == concurrent
Cached->>Conc: spawn concurrent reads (subranges)
par subrange reads
Conc->>Store: ReadDataInRange(subrange)
Store-->>Conc: bytes
end
Conc->>Conc: decode & merge KVs
Conc-->>Cached: merged KVs
end
Cached-->>Caller: deliver KVs
Caller->>Cached: close
Cached-->>Store: release underlying resources
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
pkg/lightning/backend/external/bench_test.go (2)
706-714: MissingcloseCachedReaderscall afterreadAllData.Unlike other call sites (
testutil.go,merge_v2.go,engine.go), this benchmark doesn't close the cached readers afterreadAllData. While this won't cause test failures (Go will clean up on process exit), it leaks readers until GC and deviates from the established pattern across this PR.♻️ Suggested fix
err = readAllData( ctx, store, dataFiles, statFiles, make([]cachedReader, len(dataFiles)), startKey, endKey, readRanges[0][0], readRanges[1][1], smallBlockBufPool, largeBlockBufPool, output) t.Logf("read all data cost: %s", time.Since(now)) intest.AssertNoError(err) + // Note: cachedReaders not closed here since this is a benchmark; + // in production code, closeCachedReaders should be called.Or if strict consistency is preferred:
+ cachedReaders := make([]cachedReader, len(dataFiles)) err = readAllData( ctx, store, dataFiles, statFiles, - make([]cachedReader, len(dataFiles)), + cachedReaders, startKey, endKey, readRanges[0][0], readRanges[1][1], smallBlockBufPool, largeBlockBufPool, output) + _ = closeCachedReaders(cachedReaders)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/backend/external/bench_test.go` around lines 706 - 714, The benchmark invokes readAllData with a freshly created slice of cachedReader but never calls closeCachedReaders afterward; update the benchmark to store the returned error in err as it already does, then call closeCachedReaders on the same cachedReader slice (the one passed into readAllData) immediately after readAllData returns (before logging or asserting), ensuring resources are released; reference the cachedReader slice passed to readAllData and the helper function closeCachedReaders to locate where to insert the call.
863-870: Same missingcloseCachedReaderspattern as above.For consistency with other call sites, consider closing the cached readers after the test.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/backend/external/bench_test.go` around lines 863 - 870, The call to readAllData is creating the cachedReader slice inline and never closing them; create a named variable (e.g., crs := make([]cachedReader, len(dataFiles))) then pass crs to readAllData and ensure you call closeCachedReaders(crs) after the call (or defer closeCachedReaders(crs) immediately after creating crs) so the cached readers are properly closed; reference the readAllData invocation and the closeCachedReaders helper when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/lightning/backend/external/bench_test.go`:
- Around line 706-714: The benchmark invokes readAllData with a freshly created
slice of cachedReader but never calls closeCachedReaders afterward; update the
benchmark to store the returned error in err as it already does, then call
closeCachedReaders on the same cachedReader slice (the one passed into
readAllData) immediately after readAllData returns (before logging or
asserting), ensuring resources are released; reference the cachedReader slice
passed to readAllData and the helper function closeCachedReaders to locate where
to insert the call.
- Around line 863-870: The call to readAllData is creating the cachedReader
slice inline and never closing them; create a named variable (e.g., crs :=
make([]cachedReader, len(dataFiles))) then pass crs to readAllData and ensure
you call closeCachedReaders(crs) after the call (or defer
closeCachedReaders(crs) immediately after creating crs) so the cached readers
are properly closed; reference the readAllData invocation and the
closeCachedReaders helper when making this change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2c49493f-b467-4650-8db1-a5764c8c94f2
📒 Files selected for processing (8)
pkg/lightning/backend/external/bench_test.gopkg/lightning/backend/external/engine.gopkg/lightning/backend/external/merge_v2.gopkg/lightning/backend/external/reader.gopkg/lightning/backend/external/reader_test.gopkg/lightning/backend/external/testutil.gopkg/lightning/backend/external/util.gopkg/lightning/backend/external/util_test.go
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #67382 +/- ##
================================================
+ Coverage 77.8173% 78.2608% +0.4435%
================================================
Files 2023 1952 -71
Lines 556183 544544 -11639
================================================
- Hits 432807 426165 -6642
+ Misses 121632 117947 -3685
+ Partials 1744 432 -1312
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
🔍 Starting code review for this PR... |
ingress-bot
left a comment
There was a problem hiding this comment.
This review was generated by AI and should be verified by a human reviewer.
Manual follow-up is recommended before merge.
Summary
- Total findings: 12
- Inline comments: 12
- Summary-only findings (no inline anchor): 0
Findings (highest risk first)
🚨 [Blocker] (2)
- Reserved key/value returned from freed pool memory on sequential reader reuse (pkg/lightning/backend/external/reader.go:228, pkg/lightning/backend/external/engine.go:860)
- Reserved KV in reused sequential reader references pool-recycled buffer memory across batches (pkg/lightning/backend/external/reader.go:513, pkg/lightning/backend/external/reader.go:226, pkg/lightning/backend/external/engine.go:540)
⚠️ [Major] (1)
- MergeOverlappingFilesV2 leaks cachedReaders when readAllData fails (pkg/lightning/backend/external/merge_v2.go:137, pkg/lightning/backend/external/merge_v2.go:153)
🟡 [Minor] (8)
- Dead
statsFilesparameter kept inreadAllDataafter refactoring (pkg/lightning/backend/external/reader.go:41, pkg/lightning/backend/external/engine.go:289, pkg/lightning/backend/external/merge_v2.go:143, pkg/lightning/backend/external/testutil.go:84, pkg/lightning/backend/external/bench_test.go:708) - readConn hard-coded to 32 (from 1000) limits I/O parallelism for high-file-count workloads (pkg/lightning/backend/external/reader.go:81, pkg/lightning/backend/external/merge_v2.go:139)
- closeCachedReaders stops at first close error, leaking remaining readers (pkg/lightning/backend/external/reader.go:138)
fileReader.reservedoc promises caching thatconcurrentReadersilently ignores (pkg/lightning/backend/external/reader.go:195, pkg/lightning/backend/external/reader.go:402)cachedReadertype lacks a doc comment explaining the reader-reuse invariant (pkg/lightning/backend/external/reader.go:404, pkg/lightning/backend/external/reader.go:409, pkg/lightning/backend/external/reader.go:413)- Log message references deleted function
readOneFile(pkg/lightning/backend/external/reader.go:429) - Reader-reuse test lacks data-correctness assertions across batch boundaries (pkg/lightning/backend/external/reader_test.go:195)
- Positional
[2][]uint64return type creates a fragile parallel contract across all callers (pkg/lightning/backend/external/util.go:72, pkg/lightning/backend/external/engine.go:538, pkg/lightning/backend/external/merge_v2.go:147, pkg/lightning/backend/external/testutil.go:89, pkg/lightning/backend/external/bench_test.go:710)
🧹 [Nit] (1)
getReadRangeFromPropsfirst-paragraph doc still describes only start-offset computation (pkg/lightning/backend/external/util.go:57)
| continue | ||
| } | ||
| if bytes.Compare(k, endKey) >= 0 { | ||
| cr.r.reserve(k, v) |
There was a problem hiding this comment.
🚨 [Blocker] Reserved KV in reused sequential reader references pool-recycled buffer memory across batches
Impact
When a sequential cachedReader is reused across ingest batches in LoadIngestData, the reserved key/value slices point to memory from the previous batch's smallBlockBuf. That buffer is transferred to MemoryIngestData and destroyed when the consumer calls DecRef(), returning pool blocks to the channel cache. The next batch may acquire those same pool blocks for new buffers, silently overwriting the reserved data.
The resulting MemoryIngestData for the later batch contains KV pairs with corrupted key/value bytes, leading to silent data integrity loss during TiKV ingest.
Scope
pkg/lightning/backend/external/reader.go:513—cachedReader.readpkg/lightning/backend/external/reader.go:226—sequentialReader.nextKVpkg/lightning/backend/external/engine.go:540—Engine.LoadIngestData
Evidence
In cachedReader.read(), when k >= endKey, cr.r.reserve(k, v) stores slices allocated from smallBlockBuf.AddBytes(). After readAllData returns, buildIngestData takes ownership of output.memKVBuffers (including smallBlockBuf). When MemoryIngestData.DecRef() reaches 0, Buffer.Destroy() returns pool blocks via Pool.release() into blockCache. The next batch calls readAllData which creates new buffers via smallBlockBufPool.NewBuffer(), potentially acquiring the same recycled blocks. The reused sequentialReader.nextKV() then returns r.reservedKey/Val that point into overwritten pool memory. The production channel is unbuffered (make(chan engineapi.DataAndRanges)), so async ingest workers can call DecRef() on batch N while the producer is building batch N+1.
Change request
After returning the reserved KV in sequentialReader.nextKV(), re-copy the key and value into the caller's smallBlockBuf so that the returned slices are owned by the current batch's buffer, not the previous batch's. Alternatively, copy the reserved KV into new allocations at reserve time that are not pool-managed.
| logutil.Logger(ctx).Info("estimated file size of this range group", | ||
| zap.String("totalSize", units.BytesSize(float64(totalFileSize)))) | ||
|
|
||
| eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) |
There was a problem hiding this comment.
[P1] Cached readers opened with errgroup context fail on reuse
Why: readAllData creates egCtx via errgroup.WithContext, passes it to cachedReader.open(), and the context is canceled when eg.Wait() returns. When sequential readers are reused across batches (intended behavior), the underlying store.Open(egCtx, ...) reader becomes poisoned with a canceled context, causing subsequent reads to fail with context.Canceled on real object storage backends.
Evidence: pkg/lightning/backend/external/reader.go:80 creates egCtx; pkg/lightning/backend/external/reader.go:100 passes egCtx into cachedReaders[fileIdx].open; pkg/util/wait_group_wrapper.go:247 shows errgroup.WithContext is used; pkg/lightning/backend/external/reader.go:409 enables reuse via canReuse; object storage readers store the Open ctx internally.
Fix: Use a long-lived context (e.g., LoadIngestData ctx) for opening cached readers, not the per-batch egCtx.
There was a problem hiding this comment.
Fixed in 7e372c5. cachedReader.open() now uses the outer LoadIngestData context instead of the per-batch errgroup context, so reused sequential readers are no longer tied to a canceled batch context.
There was a problem hiding this comment.
⏳ @joechenrh I've received your follow-up on this review comment and will continue working on it. I'll update this comment when I have something to share.
ℹ️ Learn more details on Pantheon AI.
|
🔍 Processing follow-up on review comments... |
|
🔍 Processing follow-up on review comments... |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. Notice: To remove the For example:
📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/lightning/backend/external/util.go (1)
57-60: Add field-level docs onReadRange.StartandReadRange.End.This new exported type carries non-obvious semantics: per-file offsets,
[start,end)usage, and boundary behavior. Field comments would make the API self-contained for callers that never readgetReadRangeFromProps.As per coding guidelines "Keep exported-symbol doc comments, and prefer semantic constraints over name restatement."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/backend/external/util.go` around lines 57 - 60, Update the exported ReadRange struct by adding field-level doc comments for ReadRange.Start and ReadRange.End that explain these slices represent per-file offsets derived from range properties (one entry per file), that offsets are byte indices within each file, that the interval is half-open [start,end) (i.e. start inclusive, end exclusive), that Start and End must have the same length and corresponding entries form the ranges for each file, and note expected boundary behavior (e.g., end == start means an empty range for that file; values are offsets relative to the file start).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/lightning/backend/external/util.go`:
- Around line 63-70: The End value is being derived from the same property used
to compute Start in getReadRangeFromProps, which causes over-read when the
requested upper bound equals the next property's firstKey; fix by computing
Start and End independently: for each requested key compute Start as the
startOffset of the largest property with property.firstKey <= key, but compute
End as the startOffset of the first property with property.firstKey >= key (and
if none, use the final property's startOffset + encodedSize); update the logic
that sets result[].Start and result[].End accordingly (or alternatively carry
the previous property's end alongside the current property's start) so that
result[B].End is the exact boundary (strict < keyB) rather than the current
property's end.
---
Nitpick comments:
In `@pkg/lightning/backend/external/util.go`:
- Around line 57-60: Update the exported ReadRange struct by adding field-level
doc comments for ReadRange.Start and ReadRange.End that explain these slices
represent per-file offsets derived from range properties (one entry per file),
that offsets are byte indices within each file, that the interval is half-open
[start,end) (i.e. start inclusive, end exclusive), that Start and End must have
the same length and corresponding entries form the ranges for each file, and
note expected boundary behavior (e.g., end == start means an empty range for
that file; values are offsets relative to the file start).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 8fa8cdfe-c6f7-4729-87ff-7b9660ca3d2f
📒 Files selected for processing (8)
pkg/lightning/backend/external/bench_test.gopkg/lightning/backend/external/engine.gopkg/lightning/backend/external/merge_v2.gopkg/lightning/backend/external/reader.gopkg/lightning/backend/external/reader_test.gopkg/lightning/backend/external/testutil.gopkg/lightning/backend/external/util.gopkg/lightning/backend/external/util_test.go
🚧 Files skipped from review as they are similar to previous changes (7)
- pkg/lightning/backend/external/testutil.go
- pkg/lightning/backend/external/bench_test.go
- pkg/lightning/backend/external/reader_test.go
- pkg/lightning/backend/external/merge_v2.go
- pkg/lightning/backend/external/engine.go
- pkg/lightning/backend/external/reader.go
- pkg/lightning/backend/external/util_test.go
| // getReadRangeFromProps reads the statistic files to find both start and end | ||
| // offsets for each requested key. The start offset is the largest offset such | ||
| // that the key at the offset is less than or equal to the requested key; the | ||
| // end offset is derived from that property's total encoded size. | ||
| // | ||
| // Caller can specify multiple ascending keys and getReadRangeFromProps will return | ||
| // the offsets per file for each key. For a range [keyA, keyB), the caller can use | ||
| // result[A] as startOffsets and result[B] as estimatedEndOffsets. | ||
| // the start/end offsets per file for each key. For a range [keyA, keyB), the caller can use | ||
| // result[A].Start as startOffsets and result[B].End as endOffsets. |
There was a problem hiding this comment.
End still over-reads when the upper bound lands exactly on a property boundary.
Line 138 always derives End from the same property chosen for Start. For the documented [keyA, keyB) usage, result[B].End needs strict < keyB semantics. Example: if one property starts at offset 100 for key 10 and the next starts at 150 for key 20, asking for keyB == 20 currently yields End == 200 instead of 150, so adjacent batches still overlap. The current “single tuple per key” approach can’t represent the exact-boundary case cleanly; Start and End need to be accumulated separately (or at least with the previous property’s end carried alongside the current property’s start). A regression test where a jobKey equals the next property’s firstKey would pin this down.
Also applies to: 130-138
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/backend/external/util.go` around lines 63 - 70, The End value
is being derived from the same property used to compute Start in
getReadRangeFromProps, which causes over-read when the requested upper bound
equals the next property's firstKey; fix by computing Start and End
independently: for each requested key compute Start as the startOffset of the
largest property with property.firstKey <= key, but compute End as the
startOffset of the first property with property.firstKey >= key (and if none,
use the final property's startOffset + encodedSize); update the logic that sets
result[].Start and result[].End accordingly (or alternatively carry the previous
property's end alongside the current property's start) so that result[B].End is
the exact boundary (strict < keyB) rather than the current property's end.
|
@joechenrh: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Before this commit the v1 ingest path read the entire data file whenever any part of it was requested, because cachedReader.open (from PR pingcap#67382) only knew how to build v0 sequentialReader / concurrentReader. The streaming v1 fallback worked but dropped most of the bytes via the in-loop key-range filter — O(filesize) wasted work per batch. This commit teaches cachedReader to open v1 files at their exact [startOffset, endOffset) physical byte range: - getReadRangeFromProps: return per-file fileVersions alongside ReadRanges. Extracted from the statsReader.version() that the function already reads — no extra opens. For v1 props the end offset is now p.offset + p.compressedSize (physical frame boundary), not p.offset + p.totalSize() (uncompressed record bytes), because rangeProperty.offset is physical in v1. - cachedReader: new setFormat helper that callers invoke after getReadRangeFromProps to record the v0/v1 format. open() uses the cached format to dispatch between sequentialReader/concurrentReader (v0) and the new rangedV1Reader (v1). Callers that don't setFormat fall back to a detectDataFileFormat probe — correct but costs one extra store.Open on the data file. All callers in this tree (engine.go, merge_v2.go, testutil.go, TestReadAllData_V1Files, TestReadAllDataReuseSequentialReaderAcrossBatches, TestReadLargeFile) now call setFormat. - rangedV1Reader: new fileReader implementation that opens [startOffset, endOffset) on the data file and wraps the bytes in a streaming zstd.NewReader. Clamps startOffset to at least fileHeaderLen so getReadRangeFromProps's zero-initialized "start key not in this file" offset doesn't land inside the 6-byte file header. An empty range returns an EOF-only reader without touching storage. Implements reserve() for carry-over across batches; v1 readers are still not reused across batches (cachedReader.canReuse returns false for v1) because each batch is bound to a specific zstd-frame range. - TestReadAllData_V1Files now uses readRanges[0].Start / readRanges[1].End and calls setFormat on the cachedReaders, the same way production callers do. All 4 v1 tests pass (merge, mixed merge, readAllData ingest, ReadKVFilesAsync) and the PR pingcap#67382 reuse test passes under isolated/targeted runs. Known pre-existing test flakiness (NOT caused by this commit): TestReadAllDataReuseSequentialReaderAcrossBatches fails under the whole-package short run with failpoints enabled because byte_reader_test.go:251 mutates the global ConcurrentReaderBufferSizePerConc without restoring it. Verified by checking out the pristine PR pingcap#67382 branch (mine/fix-global-sort-read-reuse) and observing the same failure. The PR owner should fix the byte_reader_test.go leak separately. Validation: targeted runs pass for TestNewMergeKVIter_V1Files, TestNewMergeKVIter_MixedV0V1, TestReadAllData_V1Files, TestReadKVFilesAsync_V1Files, TestReadAllDataReuseSequentialReaderAcrossBatches, TestReadLargeFile, TestGetReadRangeFromProps, TestReadAllDataBasic.
Two hardening fixes from an adversarial review pass: 1. getReadRangeFromProps (util.go): once the stat file is detected as v1, require p.compressedSize > 0 for every prop. Previously the code used compressedSize == 0 as a dual-purpose sentinel: "no compressed frame, fall back to v0 totalSize() math". That is fine for legitimate v0 files but fails-open on a truncated or otherwise corrupt v1 prop — the downstream reader would open an arbitrary byte range inside compressed frames and surface as a cryptic zstd decode error later, after possibly skipping rows. Now the v1 branch errors out immediately with an explicit "zero compressedSize" corruption message while the v0 branch keeps the existing totalSize() math. 2. NewMergeKVIter (iter.go): reject non-zero pathsStartOffset on v1 files with an explicit error instead of silently ignoring the hint. v0 offsets are uncompressed-byte offsets which have no meaning inside a zstd stream, and the merge step's task meta does not carry segment metadata today, so a v1 reader cannot honor a mid-stream resume. Today every production caller (MergeOverlappingFiles, ddl backfill merge) passes zeroOffsets, but the API surface allowed a non-zero value through which would produce duplicate or out-of-range KVs. Fail fast on that shape instead. Tests: - TestGetReadRangeFromProps_V1ZeroCompressedSizeRejected: hand-crafts a v1 stat file with a prop whose compressedSize is zero and asserts getReadRangeFromProps rejects it. The search key is chosen to land on the corrupt prop so the validation branch fires. - TestNewMergeKVIter_V1RejectsNonZeroStartOffset: writes a v1 file via Writer with SetCompression(CompressionZstd), then calls NewMergeKVIter with offsets[0]=128 and asserts the returned error mentions "non-zero pathsStartOffset". The test intentionally does NOT call iter.Close() on the error path: NewMergeKVIter returns a non-nil *MergeKVIter whose internal mergeIter is nil when a reader-opener errors, and Close() would panic on the nil — that is a separate pre-existing quirk of NewMergeKVIter's constructor, not in scope for this change. Validation: all 10 targeted tests pass (4 v1 tests + 2 hardening tests + 4 PR pingcap#67382/getReadRangeFromProps tests).
|
🔍 Starting code review for this PR... |
ingress-bot
left a comment
There was a problem hiding this comment.
This review was generated by AI and should be verified by a human reviewer.
Manual follow-up is recommended before merge.
Summary
- Total findings: 6
- Inline comments: 6
- Summary-only findings (no inline anchor): 0
Findings (highest risk first)
⚠️ [Major] (2)
- Batch-level cancellation is bypassed during reader open for each file (pkg/lightning/backend/external/reader.go:100, pkg/lightning/backend/external/reader.go:458, pkg/lightning/backend/external/reader.go:313)
- Concurrent read path drops KV length sanity checks and can panic on malformed blocks (pkg/lightning/backend/external/reader.go:390, pkg/lightning/backend/external/reader.go:336)
🟡 [Minor] (4)
readAllDatanow exposes reader-cache ownership and cleanup as duplicated caller policy (pkg/lightning/backend/external/reader.go:42, pkg/lightning/backend/external/engine.go:526, pkg/lightning/backend/external/merge_v2.go:137, pkg/lightning/backend/external/testutil.go:78, pkg/lightning/backend/external/bench_test.go:708)reserveis too generic for the new boundary-carry invariant (pkg/lightning/backend/external/reader.go:200, pkg/lightning/backend/external/reader.go:524)- KV decode contract is now duplicated in two reader implementations (pkg/lightning/backend/external/reader.go:390, pkg/lightning/backend/external/kv_reader.go:71)
- New exported
ReadRangetype adds public naming surface without an external consumer (pkg/lightning/backend/external/util.go:58)
| err2 := readOneFile( | ||
| egCtx, | ||
| if err := cachedReaders[fileIdx].open( | ||
| ctx, |
There was a problem hiding this comment.
⚠️ [Major] Batch-level cancellation is bypassed during reader open for each file
Impact
When one file read fails, sibling workers continue opening and prefetching other files until the outer request context ends.
This extends failure latency and keeps object-storage traffic running after the batch has already failed, which weakens overload and failure containment for large imports.
Scope
pkg/lightning/backend/external/reader.go:100—readAllDatapkg/lightning/backend/external/reader.go:458—cachedReader.openpkg/lightning/backend/external/reader.go:313—concurrentReader.startOnce
Evidence
readAllData creates egCtx for per-batch cancellation, but cachedReaders[fileIdx].open is invoked with ctx instead of egCtx.
cachedReader.open passes that context into newConcurrentReader, and startOnce runs all objstore.ReadDataInRange calls under that non-group context.
After one worker returns an error and cancels egCtx, peers already inside open do not observe that cancellation path.
Change request
Call cachedReaders[fileIdx].open with egCtx so file-open and range-prefetch work is tied to the same batch failure scope as read.
Keep newConcurrentReader and all ReadDataInRange calls under that group context, and add a cancellation regression test that forces one file read error while another file is in open.
| return nil, nil, noEOF(err) | ||
| } | ||
| valLen := int(binary.BigEndian.Uint64(lenBytes)) | ||
| keyAndValue, err := r.readNBytes(blockBuf, keyLen+valLen) |
There was a problem hiding this comment.
⚠️ [Major] Concurrent read path drops KV length sanity checks and can panic on malformed blocks
Impact
Malformed or partially corrupted KV blocks can crash the import worker instead of returning a structured read error.
When keyLen or valLen overflows int or becomes non-positive after conversion, slice math in the new concurrent path can panic and abort range loading.
Scope
pkg/lightning/backend/external/reader.go:390—(*concurrentReader).nextKVpkg/lightning/backend/external/reader.go:336—(*concurrentReader).readNBytes
Evidence
KVReader.NextKV previously delegated to byteReader.readNBytes, which rejects non-positive lengths and reads larger than 1 GiB before slicing.
The new concurrentReader.nextKV converts raw length headers to int and calls readNBytes(keyLen+valLen) without equivalent guards, and readNBytes itself has no n <= 0 or max-size check.
A corrupted length header such as 0xffffffffffffffff can drive invalid slice bounds instead of a deterministic decode error.
Change request
Apply the same length-validation contract used by byteReader.readNBytes: reject non-positive lengths and oversized reads before allocation or slicing.
Validate keyLen, valLen, and keyLen+valLen for overflow in concurrentReader.nextKV, then add malformed-length tests that exercise the concurrent-reader branch.
| store storeapi.Storage, | ||
| dataFiles, statsFiles []string, | ||
| dataFiles []string, | ||
| cachedReaders []cachedReader, |
There was a problem hiding this comment.
🟡 [Minor] readAllData now exposes reader-cache ownership and cleanup as duplicated caller policy
Impact
The patch moves cachedReader lifecycle out of readAllData, so every caller now has to hand-roll allocation, reuse policy, and cleanup. The diff already shows inconsistent ownership patterns across call sites, which makes future changes to reader reuse semantics harder to apply consistently and review.
Scope
pkg/lightning/backend/external/reader.go:42—readAllDatapkg/lightning/backend/external/engine.go:526—(*Engine).LoadIngestDatapkg/lightning/backend/external/merge_v2.go:137—MergeOverlappingFilesV2pkg/lightning/backend/external/testutil.go:78—testReadAndComparepkg/lightning/backend/external/bench_test.go:708
Evidence
readAllData now requires an external cachedReaders []cachedReader argument and indexes it in lockstep with dataFiles and offsets. engine.go keeps one cache for the full batch loop with deferred close, while merge_v2.go and testutil.go rebuild and close per group, and one-shot callers in bench_test.go pass a temporary slice directly. This is one behavioral policy expressed in several places instead of one owned abstraction.
Change request
Introduce a local owner abstraction for this contract (for example, a batchReaders helper with readRange(...) and close()), so caller code passes range intent instead of cache internals. Keep one-shot behavior and reusable behavior as explicit constructors or modes on that helper, and remove per-caller lifecycle duplication.
| // reserve stores the first key-value pair outside the current range so a | ||
| // reused sequential reader can continue from it in the next batch. Readers | ||
| // that are never reused may implement this as a no-op. | ||
| reserve(key, value []byte) |
There was a problem hiding this comment.
🟡 [Minor] reserve is too generic for the new boundary-carry invariant
Impact
The new reader invariant depends on carrying the first out-of-range KV into the next batch, but the method name reserve does not signal that behavior. This makes the interface contract harder to infer and increases maintenance risk when new fileReader implementations are added.
Scope
pkg/lightning/backend/external/reader.go:200—fileReader.reservepkg/lightning/backend/external/reader.go:524—cachedReader.read
Evidence
cachedReader.read calls cr.r.reserve(k, v) exactly at the [startKey, endKey) boundary handoff, where the method semantically means next-batch KV stashing rather than capacity reservation. The interface name is generic enough to be misread, and correct behavior currently depends on reading the long comment to recover intent.
Change request
Rename reserve to an intent-revealing verb such as stashNextBatchKV or saveBoundaryKV, then update implementations and callsites accordingly. Keep the existing invariant comment, but align the identifier with that contract so readers can understand behavior from the API shape alone.
| return result, nil | ||
| } | ||
|
|
||
| func (r *concurrentReader) nextKV(blockBuf *membuf.Buffer) ([]byte, []byte, error) { |
There was a problem hiding this comment.
🟡 [Minor] KV decode contract is now duplicated in two reader implementations
Impact
The same wire-format parsing is maintained in both the sequential and concurrent read paths. This increases maintenance drift risk because decoder changes now require synchronized edits across multiple locations.
Scope
pkg/lightning/backend/external/reader.go:390—(*concurrentReader).nextKVpkg/lightning/backend/external/kv_reader.go:71—(*KVReader).NextKV
Evidence
(*concurrentReader).nextKV reimplements the same <key-len><value-len><key><value> decoding steps already present in (*KVReader).NextKV, including repeated binary.BigEndian length parsing and noEOF handling. The patch introduces this second decoder path instead of reusing one shared decoding primitive.
Change request
Factor the frame decoding into a shared helper used by both paths, or route concurrent reads through an adapter that reuses the existing KVReader decoding contract. Keep buffering strategy separate, but keep format parsing single-sourced.
| // file reader, read, parse and skip few smaller keys, and then locate the needed | ||
| // data. | ||
| // ReadRange stores per-file offsets derived from range properties for one key. | ||
| type ReadRange struct { |
There was a problem hiding this comment.
🟡 [Minor] New exported ReadRange type adds public naming surface without an external consumer
Impact
The patch introduces a new exported identifier in external that becomes part of the package's visible naming surface. Keeping this type exported without a real cross-package consumer makes future refactors harder because downstream callers can start depending on it.
Scope
pkg/lightning/backend/external/util.go:58—ReadRange
Evidence
type ReadRange struct is exported, but its producer getReadRangeFromProps is unexported and all current uses stay inside pkg/lightning/backend/external. The current call chain (getReadRangeFromProps -> LoadIngestData/MergeOverlappingFilesV2/test helpers) does not require an exported contract type.
Change request
Make the type package-private (readRange) and keep it internal to the current implementation. If you intentionally want a public contract, add an exported producer/consumer boundary and document that external usage explicitly.
Summary
This PR reduces repeated object-storage reads during global sort ingest.
Test
Summary by CodeRabbit