-
-
Notifications
You must be signed in to change notification settings - Fork 153
chore: validate parquet before and after upload to object store #1432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: validate parquet before and after upload to object store #1432
Conversation
after parquet file creation - 1. validate if file size > FOOTER_SIZE 2. read the file and validate if num_rows > 0 after parquet file upload to object store - 1. perform a head() to get the metadata of the file validate if file size = file size of the parquet from staging directory Fixes: parseablehq#1430
WalkthroughAdds Parquet integrity checks during discovery and finalization using SerializedFileReader and row-count validation; promotes Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Ingest as Ingest
participant FS as Filesystem
participant Parq as ParquetReader
Ingest->>FS: List .parquet and .part files
loop each candidate
Ingest->>FS: Open file (stat/size)
Ingest->>Parq: Open with SerializedFileReader, read metadata
alt Valid (size ok, readable metadata, rows >= 0)
Note right of Ingest #DFF2E1: Valid → allow processing
Ingest->>FS: If .part → std::fs::rename to .parquet
else Invalid
Note right of Ingest #FFEFEF: Log with stream name, skip file
end
end
sequenceDiagram
autonumber
participant Uploader as Uploader
participant FS as Filesystem
participant Store as ObjectStorage
Uploader->>Store: Upload parquet object
Uploader->>FS: Stat local file → expected_size
Uploader->>Store: HEAD uploaded object
alt HEAD success & sizes match
Note right of Uploader #DFF2E1: Validation passed → create manifest
else HEAD error or size mismatch
Note right of Uploader #FFEFEF: Validation failed
Uploader->>Store: Delete remote object
Uploader->>Uploader: Return UploadResult (manifest_file=None)
Uploader->>FS: Keep staging file for retry
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/parseable/streams.rs (1)
688-696
: Open .part with write+truncate; avoid appending to stale partialsUsing
append(true)
risks corrupting or duplicating data if a previous partial exists. Preferwrite(true)
+truncate(true)
.- let mut part_file = OpenOptions::new() - .create(true) - .append(true) - .open(part_path) + let mut part_file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(part_path) .map_err(|_| StagingError::Create)?;Optionally call
part_file.sync_all()
afterwriter.close()
to ensure durability before rename.
🧹 Nitpick comments (3)
src/storage/object_storage.rs (2)
109-114
: Avoid blocking metadata call in async pathUse tokio's non-blocking metadata to prevent blocking an async worker thread.
- let local_file_size = path - .metadata() - .map_err(|e| ObjectStorageError::Custom(format!("Failed to get local file metadata: {e}")))? - .len(); + let local_file_size = tokio::fs::metadata(&path) + .await + .map_err(|e| ObjectStorageError::Custom(format!("Failed to get local file metadata: {e}")))? + .len();
948-953
: Downgrade log level to warn for retryable validation failuresThis path preserves the file for retry and is expected to self-heal; WARN reduces false-positive error alerting.
- error!( + warn!( "Parquet file upload size validation failed for {:?}, preserving in staging for retry", upload_result.file_path );src/parseable/streams.rs (1)
411-416
: Directory scan now opens every parquet: consider costValidating every file (open + read metadata) on listing can be expensive in large stagings. Optionally filter by size here and perform deep validation right before upload (already done when writing .part).
If listing performance regresses in your workloads, we can gate deep validation behind a config flag.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/parseable/streams.rs
(5 hunks)src/storage/object_storage.rs
(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.070Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/storage/object_storage.rs
src/parseable/streams.rs
🧬 Code graph analysis (1)
src/storage/object_storage.rs (2)
src/storage/localfs.rs (1)
from
(583-585)src/storage/s3.rs (2)
from
(862-870)from
(874-876)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (3)
src/storage/object_storage.rs (1)
956-956
: LGTM on clearer error message“Error uploading parquet file” precisely scopes the failure.
src/parseable/streams.rs (2)
37-41
: LGTM: bring in Parquet footer size and readers for validationImports are correct and scoped.
655-660
: Direct rename is fine; ensure same-volume moveGiven both paths live under the same staging root, rename is atomic on POSIX. No action needed.
Please confirm staging never crosses filesystems; otherwise we should fall back to copy+fsync+rename.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/parseable/streams.rs (1)
677-683
: OpenOptions should truncate, not append, .part files.append(true) risks corrupting leftovers if a stale .part exists (e.g., after crash). Use write(true) + truncate(true) for idempotent, deterministic writes.
Apply this diff:
- let mut part_file = OpenOptions::new() - .create(true) - .append(true) - .open(part_path) + let mut part_file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(part_path) .map_err(|_| StagingError::Create)?;
♻️ Duplicate comments (1)
src/parseable/streams.rs (1)
700-741
: Enforce num_rows > 0 (reject <= 0) and log count.Current check only rejects exactly 0; it should reject any non-positive count to meet the PR objective. Include num_rows in the log for observability.
Apply this diff:
- Ok(reader) => { - if reader.metadata().file_metadata().num_rows() == 0 { - error!("Invalid parquet file {path:?} for stream {stream_name}"); - false - } else { - true - } - } + Ok(reader) => { + let num_rows = reader.metadata().file_metadata().num_rows(); + if num_rows <= 0 { + error!( + "Invalid row count ({num_rows}) in parquet file {path:?} for stream {stream_name}" + ); + false + } else { + true + } + }Optionally add a unit test to prevent regressions:
#[test] fn rejects_empty_parquet_files_in_listing() { let temp = TempDir::new().unwrap(); let options = Arc::new(Options { local_staging_path: temp.path().to_path_buf(), ..Default::default() }); let stream = Stream::new(options, "s", LogStreamMetadata::default(), None); // create an empty parquet (0 rows) let schema = Schema::new(vec![Field::new(DEFAULT_TIMESTAMP_KEY, DataType::Timestamp(TimeUnit::Millisecond, None), false)]); let parquet_path = stream.data_path.join("empty.parquet"); std::fs::create_dir_all(&stream.data_path).unwrap(); { let file = File::create(&parquet_path).unwrap(); let mut w = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); // no writes w.close().unwrap(); } // listing should exclude it let files = stream.parquet_files(); assert!(!files.iter().any(|p| p.ends_with("empty.parquet"))); }
🧹 Nitpick comments (2)
src/parseable/streams.rs (2)
411-417
: Optional: avoid full metadata parse on every listing.parquet_files() now validates each file via is_valid_parquet_file. Consider a cheap prefilter (e.g., size >= FOOTER_SIZE) before opening/parsing metadata to reduce I/O in large staging dirs. Keep full validation before upload/manifest, but this list operation can be lighter.
688-695
: Don’t panic on cleanup failure of invalid .part.Using expect() here can crash the ingest path. Log a warning and continue.
Apply this diff:
- remove_file(part_path).expect("File should be removable if it is invalid"); + if let Err(e) = remove_file(part_path) { + warn!("Failed to remove invalid parquet part file {}: {e}", part_path.display()); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/parseable/streams.rs
(5 hunks)src/storage/object_storage.rs
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/storage/object_storage.rs
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1432
File: src/storage/object_storage.rs:124-132
Timestamp: 2025-09-14T15:17:59.214Z
Learning: In Parseable's upload validation system (src/storage/object_storage.rs), the validate_uploaded_parquet_file function should not include bounded retries for metadata consistency issues. Instead, failed validations rely on the 30-second sync cycle for natural retries, with staging files preserved when manifest_file is set to None.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.070Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:145-166
Timestamp: 2025-08-18T12:33:16.085Z
Learning: In Parseable's staging and upload process, parquet file names are guaranteed to always contain the date part in the expected format (date=YYYY-MM-DD). The system ensures no deviation from this naming convention, making defensive parsing unnecessary for date extraction from filenames during storage metrics updates.
📚 Learning: 2025-09-14T15:17:59.214Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1432
File: src/storage/object_storage.rs:124-132
Timestamp: 2025-09-14T15:17:59.214Z
Learning: In Parseable's upload validation system (src/storage/object_storage.rs), the validate_uploaded_parquet_file function should not include bounded retries for metadata consistency issues. Instead, failed validations rely on the 30-second sync cycle for natural retries, with staging files preserved when manifest_file is set to None.
Applied to files:
src/parseable/streams.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/parseable/streams.rs (2)
37-40
: LGTM: imports enable parquet validation.Importing FOOTER_SIZE and SerializedFileReader is appropriate for the new integrity checks.
655-659
: LGTM: atomic finalize of .part → .parquet.Same-dir rename is atomic; good choice for crash-safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/parseable/streams.rs (1)
655-659
: Risk: appending to existing.part
can corrupt/balloon files on retry
OpenOptions::append(true)
will append when a stale.part
exists (e.g., previous rename failed), potentially producing invalid Parquet or duplicated data. Also, on rename failure the.part
remains and the next cycle will append again.
- Open
.part
with truncate, not append.- Optionally delete the
.part
on rename failure to ensure a clean retry.Apply this diff:
- let mut part_file = OpenOptions::new() - .create(true) - .append(true) - .open(part_path) + let mut part_file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(part_path) .map_err(|_| StagingError::Create)?;- if let Err(e) = std::fs::rename(&part_path, &parquet_path) { + if let Err(e) = std::fs::rename(&part_path, &parquet_path) { error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); + // Avoid future appends to a stale part; safe to remove since arrows are still intact. + let _ = remove_file(&part_path); } else { self.cleanup_arrow_files_and_dir(&arrow_files); }Also applies to: 677-684
🧹 Nitpick comments (3)
src/parseable/streams.rs (3)
415-416
: Filteringparquet_files()
by full metadata validation may be expensiveOpening every file to read metadata on each listing can add latency when staging holds many files. Consider caching validation results or using a lighter check here (size-only) and reserving deep validation for finalization if this shows up in profiles.
700-740
: Tighten row-count check and improve log context
- Treat negative
num_rows
as invalid too (defense-in-depth).- Include the value in logs for faster debugging.
- Ok(reader) => { - if reader.metadata().file_metadata().num_rows() == 0 { - error!("Invalid parquet file {path:?} for stream {stream_name}"); - false - } else { - true - } - } + Ok(reader) => { + let num_rows = reader.metadata().file_metadata().num_rows(); + if num_rows <= 0 { + error!("Invalid row count ({num_rows}) in parquet file {path:?} for stream {stream_name}"); + false + } else { + true + } + }
994-998
: Comment nit: wording doesn’t match behaviorThis flush drops disk writers to force Arrow-to-disk flush; it doesn’t “convert .part files to .arrows”.
- // Force flush for init or shutdown signals to convert all .part files to .arrows - // For regular cycles, use false to only flush non-current writers + // Force flush on init/shutdown to drop writers and persist all pending Arrow files. + // For regular cycles, keep current writer open to avoid disrupting in-flight minute.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/streams.rs
(6 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1432
File: src/storage/object_storage.rs:124-132
Timestamp: 2025-09-14T15:17:59.214Z
Learning: In Parseable's upload validation system (src/storage/object_storage.rs), the validate_uploaded_parquet_file function should not include bounded retries for metadata consistency issues. Instead, failed validations rely on the 30-second sync cycle for natural retries, with staging files preserved when manifest_file is set to None.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.070Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:145-166
Timestamp: 2025-08-18T12:33:16.085Z
Learning: In Parseable's staging and upload process, parquet file names are guaranteed to always contain the date part in the expected format (date=YYYY-MM-DD). The system ensures no deviation from this naming convention, making defensive parsing unnecessary for date extraction from filenames during storage metrics updates.
📚 Learning: 2025-09-14T15:17:59.214Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1432
File: src/storage/object_storage.rs:124-132
Timestamp: 2025-09-14T15:17:59.214Z
Learning: In Parseable's upload validation system (src/storage/object_storage.rs), the validate_uploaded_parquet_file function should not include bounded retries for metadata consistency issues. Instead, failed validations rely on the 30-second sync cycle for natural retries, with staging files preserved when manifest_file is set to None.
Applied to files:
src/parseable/streams.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (2)
src/parseable/streams.rs (2)
37-40
: Imports LGTM;FileReader
trait in scope is required for.metadata()
Good addition; necessary for
SerializedFileReader::metadata()
calls.
688-695
: Good: validate.part
before promote; remove invalid artifactsEarly rejection + cleanup aligns with PR objectives.
after parquet file creation -
after parquet file upload to object store -
Fixes: #1430
Summary by CodeRabbit