feat(log-ingestor): Update CLP ingestion utils for CLP-DB-backed fault-tolerance integration:#2038
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a const accessor to Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant Connector as ClpDbIngestionConnector
participant DB as MySQLDB
participant State as ClpIngestionState
participant Submitter as CompressionJobSubmitter
participant Listener as Listener
Caller->>Connector: create_ingestion_job(&S3IngestionJobConfig)
Connector->>DB: INSERT ingestion job (status, status_msg, timestamps)
DB-->>Connector: job_id
Connector->>State: init(job_id, db_pool, config...)
Connector->>Submitter: create submitter + Listener(channel)
Connector-->>Caller: (ClpIngestionState, Listener)
Caller->>State: get_job_status()
State->>DB: SELECT status, status_msg, last_update_ts
DB-->>State: status
State->>State: is_valid_transition(old, new)
alt valid
State->>DB: UPDATE status, status_msg, last_update_ts
DB-->>State: ok
opt submit compression
State->>Submitter: submit_compression_job(metadata_id)
Submitter->>DB: INSERT/UPDATE compression + object metadata (Buffered->Submitted)
DB-->>Submitter: ok
Submitter->>Listener: notify submission/completion
end
else invalid
State-->>Caller: error
end
Caller->>State: fail("message")
State->>DB: UPDATE status = Failed, status_msg, last_update_ts
DB-->>State: ok
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)
888-908:⚠️ Potential issue | 🟡 MinorRemove the unused
error_msgcolumn from the ingestion job table schema.The
error_msgcolumn (line 898) is defined in the table schema but has no usages anywhere in the codebase. Thestatus_msgcolumn (line 896) is actively used throughout the code for storing failure messages when job status is updated viaupdate_job_statusand retrieved during polling. Removeerror_msgto eliminate schema bloat and avoid confusion about message storage.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 888 - 908, The CREATE TABLE SQL in ingestion_job_table_creation_query currently defines an unused `error_msg` column; remove the "`error_msg` TEXT NULL DEFAULT NULL," line from the formatted schema string returned by ingestion_job_table_creation_query (which uses INGESTION_JOB_TABLE_NAME and ClpIngestionJobStatus::format_as_sql_enum()) and ensure no other code paths reference `error_msg` (the code uses `status_msg` and update_job_status), so simply drop that column from the schema string to eliminate the unused column.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 220-272: The idempotency branch in update_job_status currently
commits without touching status_msg which preserves the original message; decide
and implement the intended behavior: if you want to preserve the first failure
message, add a short inline comment above the early-return in update_job_status
explaining that preserving status_msg on idempotent calls is intentional
(referencing ClpIngestionJobStatus and the fail doc comment), otherwise modify
update_job_status to perform an UPDATE to set status_msg when status matches and
a new status_msg is provided (i.e., run the same SQL UPDATE with
.bind(status).bind(status_msg).bind(self.job_id) before committing).
- Around line 775-808: The polling loop in wait_for_compression_result currently
can run forever; add an overall timeout safeguard (e.g. a constant
MAX_POLL_TIMEOUT or configurable parameter) and enforce it by either wrapping
the loop in tokio::time::timeout or tracking Instant::now() and checking elapsed
on each iteration; when the timeout is hit, return an appropriate anyhow::Error
(e.g. "compression job polling timed out") instead of looping indefinitely.
Update wait_for_compression_result to reference
CompressionJobId/CompressionJobStatus and the existing sleep_duration_sec
backoff logic, and ensure the new timeout is applied before sleeping so we don't
overshoot the maximum allowed wait. Ensure tests/consumers handle the new
timeout error path.
---
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 888-908: The CREATE TABLE SQL in
ingestion_job_table_creation_query currently defines an unused `error_msg`
column; remove the "`error_msg` TEXT NULL DEFAULT NULL," line from the formatted
schema string returned by ingestion_job_table_creation_query (which uses
INGESTION_JOB_TABLE_NAME and ClpIngestionJobStatus::format_as_sql_enum()) and
ensure no other code paths reference `error_msg` (the code uses `status_msg` and
update_job_status), so simply drop that column from the schema string to
eliminate the unused column.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
components/clp-rust-utils/src/job_config/ingestion.rscomponents/log-ingestor/src/ingestion_job/state.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (2)
361-374:⚠️ Potential issue | 🟠 MajorLock ingestion-job status row during running-state validation.
check_job_status_and_commitperforms a non-locking status read, so a concurrent state transition can race with this transaction and still allow metadata commit after the check.Suggested fix
- let curr_status: ClpIngestionJobStatus = sqlx::query_scalar(formatcp!( - r"SELECT `status` FROM `{table}` WHERE `id` = ?", + let curr_status: ClpIngestionJobStatus = sqlx::query_scalar(formatcp!( + r"SELECT `status` FROM `{table}` WHERE `id` = ? FOR UPDATE", table = INGESTION_JOB_TABLE_NAME, ))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 361 - 374, The current non-locking read using sqlx::query_scalar to fetch `status` allows a race with concurrent transitions; modify the status lookup in check_job_status_and_commit to perform a locking read (SELECT `status` FROM `{table}` WHERE `id` = ? FOR UPDATE) against the same transaction `tx` so the row is locked until tx.commit(), then compare to ClpIngestionJobStatus::Running and only commit if it matches; keep using INGESTION_JOB_TABLE_NAME, self.job_id, and tx.commit() as before.
57-75:⚠️ Potential issue | 🔴 CriticalEnsure schema migration for newly written columns on existing deployments.
The runtime now reads and writes columns (
ingestion_job.status_msg,creation_ts,last_update_tson bothingestion_jobandingested_s3_object_metadata) that are missing from existing deployed tables. Startup only executesCREATE TABLE IF NOT EXISTS, which leaves existing tables unaltered—causing SQL errors when UPDATE/INSERT statements target these missing columns at runtime (e.g., line 260, 302–304, 782–787).Provide an explicit schema migration (ALTER TABLE statements or equivalent deployment mechanism) for existing installations before merging.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 57 - 75, In new() (clp_ingestion.rs) run explicit schema migrations after the CREATE TABLE IF NOT EXISTS calls: execute ALTER TABLE statements for ingestion_job to ADD COLUMN status_msg, creation_ts, last_update_ts if they do not exist, and for ingested_s3_object_metadata to ADD COLUMN creation_ts and last_update_ts if missing; ensure these ALTERs are executed against the provided MySqlPool (same place as ingestion_job_table_creation_query/ingested_s3_object_metadata_table_creation_query calls), handle and log any ALTER errors, and make the migration idempotent so startup can safely run on existing deployments.
♻️ Duplicate comments (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)
775-807:⚠️ Potential issue | 🟠 MajorAdd a bounded timeout for compression polling.
The polling loop can run forever if the compression job never reaches a terminal state. Please add an overall timeout and return a clear error when exceeded.
Suggested fix
async fn wait_for_compression_result( &self, id: CompressionJobId, ) -> anyhow::Result<(CompressionJobStatus, Option<String>)> { + const MAX_POLL_WAIT: Duration = Duration::from_secs(30 * 60); let mut sleep_duration_sec: u32 = 1; + let started_at = tokio::time::Instant::now(); loop { + if started_at.elapsed() >= MAX_POLL_WAIT { + return Err(anyhow::anyhow!( + "Timed out while waiting for compression job {id} to finish." + )); + } const POLLING_QUERY: &str = formatcp!( r"SELECT `status`, `status_msg` FROM `{table}` WHERE `id` = ?", table = CLP_COMPRESSION_JOB_TABLE_NAME @@ - tokio::time::sleep(Duration::from_secs(sleep_duration_sec.into())).await; + tokio::time::sleep(Duration::from_secs(sleep_duration_sec.into())).await; sleep_duration_sec = std::cmp::min(sleep_duration_sec.saturating_mul(2), MAX_SLEEP_DURATION_SEC); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 775 - 807, The polling loop in wait_for_compression_result can run forever; add a bounded overall timeout (e.g., const TOTAL_TIMEOUT_SEC) and return an error if exceeded. Implement this by recording an Instant::now() at the start of wait_for_compression_result (or wrap the entire loop in tokio::time::timeout), check elapsed() on each iteration and before awaiting sleep, and when elapsed >= TOTAL_TIMEOUT_SEC return Err(anyhow::anyhow!("timeout waiting for compression job {id}")). Keep existing exponential backoff logic (sleep_duration_sec, MAX_SLEEP_DURATION_SEC) and preserve returning Ok((status, status_msg)) for terminal states.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 361-374: The current non-locking read using sqlx::query_scalar to
fetch `status` allows a race with concurrent transitions; modify the status
lookup in check_job_status_and_commit to perform a locking read (SELECT `status`
FROM `{table}` WHERE `id` = ? FOR UPDATE) against the same transaction `tx` so
the row is locked until tx.commit(), then compare to
ClpIngestionJobStatus::Running and only commit if it matches; keep using
INGESTION_JOB_TABLE_NAME, self.job_id, and tx.commit() as before.
- Around line 57-75: In new() (clp_ingestion.rs) run explicit schema migrations
after the CREATE TABLE IF NOT EXISTS calls: execute ALTER TABLE statements for
ingestion_job to ADD COLUMN status_msg, creation_ts, last_update_ts if they do
not exist, and for ingested_s3_object_metadata to ADD COLUMN creation_ts and
last_update_ts if missing; ensure these ALTERs are executed against the provided
MySqlPool (same place as
ingestion_job_table_creation_query/ingested_s3_object_metadata_table_creation_query
calls), handle and log any ALTER errors, and make the migration idempotent so
startup can safely run on existing deployments.
---
Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 775-807: The polling loop in wait_for_compression_result can run
forever; add a bounded overall timeout (e.g., const TOTAL_TIMEOUT_SEC) and
return an error if exceeded. Implement this by recording an Instant::now() at
the start of wait_for_compression_result (or wrap the entire loop in
tokio::time::timeout), check elapsed() on each iteration and before awaiting
sleep, and when elapsed >= TOTAL_TIMEOUT_SEC return Err(anyhow::anyhow!("timeout
waiting for compression job {id}")). Keep existing exponential backoff logic
(sleep_duration_sec, MAX_SLEEP_DURATION_SEC) and preserve returning Ok((status,
status_msg)) for terminal states.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)
897-903:⚠️ Potential issue | 🟠 MajorEnsure new columns (
status_msg,creation_ts,last_update_ts) are migrated for existing deployments.The runtime code unconditionally reads and writes
status_msg(lines 260, 783–788), but these columns are defined only inCREATE TABLE IF NOT EXISTSblocks. If a table exists without these columns, the UPDATE and SELECT queries will fail at runtime.No ALTER TABLE migration statements exist in the codebase. Either add explicit migrations for pre-existing tables, document that log-ingestor requires a fresh database schema, or implement a schema versioning and upgrade mechanism.
Also applies to: lines 940–942 (
ingested_s3_object_metadatatable).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 897 - 903, The code reads/writes new columns (status_msg, creation_ts, last_update_ts) but only adds them in CREATE TABLE statements, so add an explicit migration path: implement a startup schema-migration step in clp_ingestion.rs (or a dedicated migration module) that checks for the existing tables (clp_ingestion and ingested_s3_object_metadata) and runs ALTER TABLE ... ADD COLUMN for status_msg TEXT NULL, creation_ts DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), last_update_ts DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) (and num_files_compressed/error_msg where applicable) if they are missing; alternatively implement simple schema versioning (e.g., a meta table + upgrade routines) and call it before any code that uses these fields so functions that SELECT/UPDATE status_msg (the ingestion read/write paths) will never run against an out-of-date schema. Ensure the migration is idempotent and logged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 641-657: The UPDATE query in submit_for_compression (operating on
INGESTED_S3_OBJECT_METADATA_TABLE_NAME and setting compression_job_id and status
to IngestedS3ObjectMetadataStatus::Submitted) is missing a restriction to the
owning ingestion job; add "AND `ingestion_job_id` = " with a bound value
(push_bind(ingestion_job_id)) to the WHERE clause that currently has "`id` IN
(...)" so rows are only updated for the specific ingestion_job_id, and apply the
same change to the analogous UPDATE block around lines 659-666.
- Around line 361-367: The SELECT of `status` in check_job_status_and_commit is
not locking the row so a concurrent fail() can race and allow metadata to commit
after a terminal transition; change the sqlx::query_scalar that reads `status`
from INGESTION_JOB_TABLE_NAME (producing a ClpIngestionJobStatus) to include a
FOR UPDATE clause so the read is done with a row lock and thus serialized with
update_job_status before calling tx.commit(); make the same change to the other
status SELECT used in this function (the second query around the later commit
check) so both reads acquire FOR UPDATE locks.
---
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 897-903: The code reads/writes new columns (status_msg,
creation_ts, last_update_ts) but only adds them in CREATE TABLE statements, so
add an explicit migration path: implement a startup schema-migration step in
clp_ingestion.rs (or a dedicated migration module) that checks for the existing
tables (clp_ingestion and ingested_s3_object_metadata) and runs ALTER TABLE ...
ADD COLUMN for status_msg TEXT NULL, creation_ts DATETIME(3) NOT NULL DEFAULT
CURRENT_TIMESTAMP(3), last_update_ts DATETIME(3) NOT NULL DEFAULT
CURRENT_TIMESTAMP(3) (and num_files_compressed/error_msg where applicable) if
they are missing; alternatively implement simple schema versioning (e.g., a meta
table + upgrade routines) and call it before any code that uses these fields so
functions that SELECT/UPDATE status_msg (the ingestion read/write paths) will
never run against an out-of-date schema. Ensure the migration is idempotent and
logged.
| let curr_status: ClpIngestionJobStatus = sqlx::query_scalar(formatcp!( | ||
| r"SELECT `status` FROM `{table}` WHERE `id` = ?", | ||
| table = INGESTION_JOB_TABLE_NAME, | ||
| ); | ||
|
|
||
| let result = sqlx::query(BASE_JOB_UPDATE_QUERY) | ||
| .bind( | ||
| u64::try_from(num_files_ingested) | ||
| .expect("number of objects should not exceed u64::MAX"), | ||
| ) | ||
| .bind(self.job_id) | ||
| .bind(ClpIngestionJobStatus::Running) | ||
| .execute(&mut *tx) | ||
| .await?; | ||
|
|
||
| if result.rows_affected() == 0 { | ||
| )) | ||
| .bind(self.job_id) | ||
| .fetch_one(&mut *tx) | ||
| .await?; |
There was a problem hiding this comment.
Lock the ingestion row before committing buffered metadata.
check_job_status_and_commit reads status without a lock, so a concurrent fail() can race between the read and tx.commit(). That allows metadata to commit after a terminal transition. Use FOR UPDATE here to serialize this check with update_job_status.
🔧 Suggested fix
- let curr_status: ClpIngestionJobStatus = sqlx::query_scalar(formatcp!(
- r"SELECT `status` FROM `{table}` WHERE `id` = ?",
+ let curr_status: ClpIngestionJobStatus = sqlx::query_scalar(formatcp!(
+ r"SELECT `status` FROM `{table}` WHERE `id` = ? FOR UPDATE",
table = INGESTION_JOB_TABLE_NAME,
))Also applies to: 374-375
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 361 - 367, The SELECT of `status` in check_job_status_and_commit is not
locking the row so a concurrent fail() can race and allow metadata to commit
after a terminal transition; change the sqlx::query_scalar that reads `status`
from INGESTION_JOB_TABLE_NAME (producing a ClpIngestionJobStatus) to include a
FOR UPDATE clause so the read is done with a row lock and thus serialized with
update_job_status before calling tx.commit(); make the same change to the other
status SELECT used in this function (the second query around the later commit
check) so both reads acquire FOR UPDATE locks.
✅ Addressed in commit dd9f3c1
There was a problem hiding this comment.
@sitaowang1998 can u help check this? I'm not sure if this is true.
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
There was a problem hiding this comment.
I don't think this is valid. fail transaction gets an IX lock, which conflicts with the S lock here.
There was a problem hiding this comment.
sg. That's also what I understand.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 883-885: The schema defines both status_msg and error_msg but the
codepath update_job_status only writes status_msg, which is confusing; either
consolidate by removing error_msg and migrating any historical data into
status_msg, or clearly document their distinct roles in the schema and update
all relevant functions (notably update_job_status) to write to error_msg when
recording error-specific details. Locate references to status_msg and error_msg
and the update_job_status function and ensure consistent usage: if you keep
both, add a schema comment and modify update_job_status (and any callers) to
populate error_msg for error cases and status_msg for general state messages; if
you remove error_msg, add a DB migration to drop the column and update any
code/comments referring to it.
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)
329-348:⚠️ Potential issue | 🟠 MajorAdd
FOR UPDATElock tocheck_job_status_and_commitfor consistency and correctness.The SELECT query at line 334 reads the job
statuswithout aFOR UPDATElock, creating a race condition where a concurrentfail()call could transition the job to a terminal state between the status check (line 339) and the transaction commit (line 348). This allows metadata to be committed after the job enters a terminal state, violating the intended invariant.Compare this to
update_job_statusat line 978, which correctly usesFOR UPDATEwith the comment "Lock the row for update and fetch the current status" (lines 976–977). The same locking pattern should be applied here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 329 - 348, In check_job_status_and_commit, the SELECT that reads `status` must lock the row to avoid races with concurrent fail() calls; modify the query built in check_job_status_and_commit (the sqlx::query_scalar call that selects `status` and binds self.job_id) to include a "FOR UPDATE" clause (matching the locking pattern used in update_job_status) so the row is locked for update within the transaction before checking curr_status and committing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 329-348: In check_job_status_and_commit, the SELECT that reads
`status` must lock the row to avoid races with concurrent fail() calls; modify
the query built in check_job_status_and_commit (the sqlx::query_scalar call that
selects `status` and binds self.job_id) to include a "FOR UPDATE" clause
(matching the locking pattern used in update_job_status) so the row is locked
for update within the transaction before checking curr_status and committing.
ClpCompressionStatetype to encapsulate compression job submission and result retrieval logic.IngestionJobStatetrait with afailmethod to allow failing an ingestion job with an explicit status message.ClpIngestionState::update_job_statusto enforce idempotency and validate state transitions before applying updates.ClpDbIngestionConnector::create_ingestion_jobto return both the ingestion state and a compression listener, enabling proper initialization of ingestion job instances.num_files_ingestedcounter from the ingestion job table to avoid transactional deadlocks on the critical path.Description
This PR is a part of the implementation for #1978.
This PR was originally split from #2035 and applies changes to the CLP ingestion utils for the fault-tolerance feature integration. It includes the following changes:
ClpCompressionStatetype to encapsulate compression job submission and result retrieval logic.IngestionJobStatetrait with afailmethod to allow failing an ingestion job with an explicit status message.ClpIngestionState::update_job_statusto enforce idempotency and validate state transitions before applying updates.ClpDbIngestionConnector::create_ingestion_jobto return both the ingestion state and a compression listener, enabling proper initialization of ingestion job instances.num_files_ingestedcounter from the ingestion job table to avoid transactional deadlocks on the critical path.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Refactor