fix: commit schema bug#1666
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
WalkthroughEvent processing obtains the Stream up-front, conditionally stages/merges the schema via a new Stream::stage_schema_file (which writes atomically and respects ingestor id), and reuses the Stream for pushes. Disk staging gains configurable row and time thresholds; JSON parse errors are surfaced via a StagingError::Json variant. ChangesSchema staging and writer updates
Sequence DiagramsequenceDiagram
participant Event as Event::process
participant PARSEABLE
participant Stream
participant StagingWriter
participant Filesystem
Event->>PARSEABLE: get_or_create_stream(stream_name)
PARSEABLE-->>Event: Stream
Event->>Event: is_first_event && !stream.get_static_schema_flag()
Event->>Stream: stage_schema_file(schema)
Stream->>StagingWriter: open staging dir / read existing *.schema
StagingWriter->>Filesystem: read/parse existing schema files
Stream->>Stream: merge schemas (Schema::try_merge)
Stream->>Filesystem: write merged schema to *.schema.tmp -> rename to *.schema
Stream-->>Event: Ok(())
Event->>Stream: push(event_data)
Stream->>StagingWriter: push_disk(..., batch_rows=DISK_WRITE_BATCH_ROWS)
StagingWriter->>StagingWriter: buffer, flush when rows>=batch_rows or age>=DISK_WRITE_BATCH_MAX_AGE_SECS
🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/event/mod.rs (1)
91-95: ⚡ Quick winAdd a regression test for the new first-event staging path.
This branch is the actual fix for the Quest failure, but I don’t see coverage here asserting that a dynamic-schema stream has a staged
*.schemabefore the firstpush()/parquet cycle. A focused test aroundEvent::processwould make this much harder to regress.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/event/mod.rs` around lines 91 - 95, Add a regression test that exercises the first-event staging path in Event::process: create a dynamic-schema stream (stream.get_static_schema_flag() == false), invoke Event::process (or the flow that sets self.is_first_event) and assert that commit_schema(&self.stream_name, ...) was called and that stream.stage_schema_file received/created the expected "*.schema" before any push()/parquet cycle; use the Event::process entry point, the is_first_event flag, and verify side-effects against commit_schema and stage_schema_file (or inspect the stream's staged schema via get_schema()) to ensure the schema file is staged on the first event.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/parseable/streams.rs`:
- Around line 510-527: stage_schema_file currently does an unsynchronized
read/merge/write that can lose fields or read partial JSON; fix by serializing
updates with a per-stream lock and writing via a temp-file + atomic rename:
add/use a per-stream Mutex/RwLock (e.g., a field on the struct guarding schema
ops) and acquire it at the start of stage_schema_file, call
get_schemas_if_present (or re-read the staging file) while holding the lock,
merge via Schema::try_merge, then write the merged bytes to a temp path (e.g.,
same dir with a .tmp suffix or .ingestor.{id}.schema.tmp) and atomically rename
to the final schema filename (replace the current RelativePathBuf usage so final
name is the intended .ingestor.{id}.schema); also ensure get_schemas_if_present
is only used/read under the same lock or returns errors instead of silently
ignoring deserialization failures so partial writes are not treated as absent.
---
Nitpick comments:
In `@src/event/mod.rs`:
- Around line 91-95: Add a regression test that exercises the first-event
staging path in Event::process: create a dynamic-schema stream
(stream.get_static_schema_flag() == false), invoke Event::process (or the flow
that sets self.is_first_event) and assert that commit_schema(&self.stream_name,
...) was called and that stream.stage_schema_file received/created the expected
"*.schema" before any push()/parquet cycle; use the Event::process entry point,
the is_first_event flag, and verify side-effects against commit_schema and
stage_schema_file (or inspect the stream's staged schema via get_schema()) to
ensure the schema file is staged on the first event.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: d98e25e0-7082-48cd-85ab-0a5c8c5731e7
📒 Files selected for processing (2)
src/event/mod.rssrc/parseable/streams.rs
pending batches by default get pushed to disk near instantly schema writer is behind a mutex to prevent incorrect schemas
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/parseable/streams.rs (1)
451-464:⚠️ Potential issue | 🟠 Major | ⚡ Quick winOnly treat
NotFoundas “no schema.”
stage_schema_file()now depends on this helper for merge correctness. ReturningOk(vec![])for anyread_dirfailure can silently drop previously staged fields from the merged schema, andFile::open(...).expect(...)can still panic on a transient race in the staging dir. Please propagate non-NotFoundI/O errors here instead of treating them as an empty schema set.Suggested fix
pub fn get_schemas_if_present(&self) -> Result<Vec<Schema>, StagingError> { - let Ok(dir) = self.data_path.read_dir() else { - return Ok(vec![]); - }; + let dir = match self.data_path.read_dir() { + Ok(dir) => dir, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(vec![]), + Err(err) => return Err(err.into()), + }; let mut schemas: Vec<Schema> = Vec::new(); for file in dir.flatten() { if let Some(ext) = file.path().extension() && ext.eq("schema") { - let file = File::open(file.path()).expect("Schema File should exist"); + let file = match File::open(file.path()) { + Ok(file) => file, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, + Err(err) => return Err(err.into()), + }; schemas.push(serde_json::from_reader(file)?); } } Ok(schemas) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` around lines 451 - 464, In get_schemas_if_present, don’t treat every read_dir error as “no schemas” — only return Ok(vec![]) when the error kind is NotFound; for other I/O errors propagate them as a StagingError (e.g. map_err into StagingError::Io or the crate’s equivalent). Also remove the File::open(...).expect(...) panic: open the schema file using ? (or map_err) so transient races return an error instead of panicking, and propagate serde_json::from_reader errors as currently done; this ensures stage_schema_file and merge logic won’t silently drop staged fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/parseable/streams.rs`:
- Around line 451-464: In get_schemas_if_present, don’t treat every read_dir
error as “no schemas” — only return Ok(vec![]) when the error kind is NotFound;
for other I/O errors propagate them as a StagingError (e.g. map_err into
StagingError::Io or the crate’s equivalent). Also remove the
File::open(...).expect(...) panic: open the schema file using ? (or map_err) so
transient races return an error instead of panicking, and propagate
serde_json::from_reader errors as currently done; this ensures stage_schema_file
and merge logic won’t silently drop staged fields.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9d8d1151-be65-44dd-aded-da15ddb78480
📒 Files selected for processing (3)
src/parseable/staging/mod.rssrc/parseable/staging/writer.rssrc/parseable/streams.rs
Parseable enterprise failed Quest test due to schema not being present in metastore as soon as possible
Description
This PR has:
Summary by CodeRabbit
Refactor
Bug Fixes