feat(log-ingestor): Allow an SQS listener job to spawn multiple concurrent tasks to consume notifications from the same queue (resolves #1977).#1989
Conversation
WalkthroughAdds validated SQS listener config with concurrency and wait-time fields and validation, implements multi-task SQS listener orchestration (Task/TaskHandle per-task lifecycle), tightens AwsClientManager trait to require Clone, updates job creation to validate configs and map validation errors to API responses, and adapts tests. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant JobMgr as IngestionJobManager
participant SqsListener
participant TaskHandle
participant Task as Task<SqsClientManager>
participant SqsClient as SqsClientWrapper
User->>JobMgr: create_sqs_listener_job(raw_config)
JobMgr->>JobMgr: ValidatedSqsListenerConfig::validate_and_create(raw_config)
rect rgba(76, 175, 80, 0.5)
JobMgr->>SqsListener: spawn(job_id, &sqs_client_manager, &config, &sender)
Note over SqsListener: For each concurrent task (1..N)
loop Create N TaskHandles
SqsListener->>Task: instantiate Task{id, client_manager, config, sender}
SqsListener->>TaskHandle: TaskHandle::spawn(Task, job_id)
TaskHandle->>Task: tokio::spawn(task.run())
Task-->>TaskHandle: JoinHandle<Result<()>>
TaskHandle->>SqsListener: push to task_handles
end
end
rect rgba(33, 150, 243, 0.5)
loop Each Task
Task->>SqsClient: ReceiveMessage(wait_time_seconds)
alt Messages
Task->>Task: process messages (log job_id, task_id)
else No messages / error
Task->>Task: log outcome with job_id, task_id
end
end
end
rect rgba(244, 67, 54, 0.5)
User->>SqsListener: shutdown_and_join()
loop For each TaskHandle
SqsListener->>TaskHandle: cancel_token.cancel()
TaskHandle->>Task: cancellation observed
TaskHandle->>TaskHandle: await join_handle
Note over TaskHandle: Log cancellation/exit for task_id
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@components/clp-rust-utils/src/job_config/ingestion.rs`:
- Around line 53-76: The schema/docs conflict: decide whether wait_time_sec
(field wait_time_sec, default default_sqs_wait_time_sec) should be clamped to 20
or rejected; either remove or raise the #[schema(maximum = 20)] and update the
doc to say values >20 will be truncated to 20 (and implement clamping where
config is loaded), or keep the #[schema(maximum = 20)] and change the doc to say
values >20 will be rejected. Also eliminate duplicated magic numbers for
num_concurrent_listener_tasks (field num_concurrent_listener_tasks, default
default_num_concurrent_listener_tasks) by introducing shared constants (e.g.,
MIN_CONCURRENT_LISTENER_TASKS and MAX_CONCURRENT_LISTENER_TASKS) and use those
constants in the schema annotations, documentation text, and the runtime
validation in ingestion_job_manager.rs so the bounds stay in sync.
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 40-41: The error string for the InvalidNumConcurrentListenerTasks
variant contains a stray trailing backtick; update the #[error(...)] attribute
on the InvalidNumConcurrentListenerTasks enum variant to remove the extra
backtick so the format becomes a clean message (e.g., change `"Invalid
`num_concurrent_listener_tasks`: {0}`"` to `"Invalid
`num_concurrent_listener_tasks`: {0}"` or remove all backticks), ensuring the
error macro and variant name InvalidNumConcurrentListenerTasks remain unchanged.
In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 235-251: Move the assert that config.num_concurrent_listener_tasks
!= 0 to before allocating task_handles, and replace the lossless integer casts
with explicit from conversions: use
Vec::with_capacity(usize::from(config.num_concurrent_listener_tasks)) instead of
with_capacity(config.num_concurrent_listener_tasks as usize), and construct
Task.id with TaskId::from(task_id) (and similarly use usize::from(...) for any
other counts) so the code uses usize::from(...) and TaskId::from(...) instead of
as casts; keep the loop and TaskHandle::spawn usage unchanged.
In `@components/log-ingestor/tests/test_ingestion_job.rs`:
- Around line 222-253: The test uses num_tasks values [1,4,16,64] but 64 exceeds
the validated API limit (32) enforced by the manager; change the 64 to a value ≤
32 (or add an explicit comment that 64 is an intentional internal stress test)
and make the test deterministic by isolating SQS state between iterations—either
purge the test queue or use unique job_id/prefix per iteration so deterministic
object keys ({prefix}/{idx:05}.log) from a prior run cannot satisfy later runs;
update the call sites in run_sqs_listener_test and the SqsListenerConfig loop
accordingly and document the intent if you keep >32 so reviewers know it
bypasses ingestion_job_manager.rs validation.
| /// AWS SQS enforces a maximum wait time of 20 seconds. Any configured value greater than | ||
| /// 20 seconds will be truncated to 20 seconds. |
There was a problem hiding this comment.
Coderabbit is probably correct? We reject values > 20 rather than truncate them.
| /// AWS SQS enforces a maximum wait time of 20 seconds. Any configured value greater than | |
| /// 20 seconds will be truncated to 20 seconds. | |
| /// AWS SQS enforces a maximum wait time of 20 seconds. |
There was a problem hiding this comment.
This is tricky. With our current set up, there's no way to enforce the config to be valid values all the way down to the actual listener job spawn. That means we need to do validations manually. However, this is different from the other two validations we perform at present:
- The other two validations we perform are on the top-level, however it won't affect the actual ingestion job execution if an "invalid" value is given.
- A custom endpoint won't propagate into the SQS job execution, since it's handled by the client manager.
- A number larger than 32 given to the job also won't affect the execution. The SQS job can handle any arbitrary number of coroutines; it's our top-level decision to not allow a number > 32.
- The wait time is different. A wait time larger than 20 will cause the operation to fail. Since there's no way to enforce config validation all the way down to the SQS job execution, we need to truncate inside the actual task to make sure it's under 20 anyway. That said, I think as long as we clearly document this truncation in the OpenAPI schema, it should be fair.
I can add the logic to reject >20 sec config on the top-level, but I will keep the truncation logic inside the SQS job just in case.
There was a problem hiding this comment.
Maybe we should just adopt c philosophy: if the input isn't valid, then the behavior is undefined. You cannot validate everything in every function/using types.
So we should just reject >20 in routes.rs, and add a debug assert when receive message
There was a problem hiding this comment.
I don't agree on the C philosophy in general, lol. UB is the root of evil.
Actually, I have a better idea for handling this. We can wrap the validated config with a special type and pass this type all the way down to the ingestion job.
There was a problem hiding this comment.
Type is not the solution to everything. Even tokio implementation has undefined behaviors in their documents.
Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
298-304: 🧹 Nitpick | 🔵 TrivialConsider returning
&UuidorUuidinstead ofStringfromget_id.Callers that need the UUID as a string can call
.to_string()themselves, while callers that need theUuidtype benefit from avoiding an allocation. Also, theget_prefix is non-idiomatic in Rust — the convention is justid().
🤖 Fix all issues with AI agents
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 162-167: The hard-coded upper bound 32 used when validating
config.num_concurrent_listener_tasks should be extracted into a shared constant
to avoid divergence with the config definition; add a public constant (e.g.,
MAX_CONCURRENT_LISTENER_TASKS) next to SqsListenerConfig in the config module,
replace the literal 32 in ingestion_job_manager.rs with that constant, and
update any other places (and tests) that rely on the value so both the
validation here and the config definition reference the same symbol instead of a
magic number; ensure Error::InvalidNumConcurrentListenerTasks still reports the
provided value unchanged.
In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 262-294: There is a duplicated nested match awaiting the same
JoinHandle (use-after-move) in the loop over self.task_handles; remove the outer
match and replace with a single match on task_handle.join_handle.await that
handles the three arms (Ok(Ok(())) success logging, Ok(Err(_)) warn, and
Err(err) panic warn). Locate the loop iterating over self.task_handles in
sqs_listener.rs and update the handling around task_handle.join_handle.await so
the JoinHandle is awaited exactly once and the three cases are handled directly.
- Around line 256-296: The shutdown_and_join method contains a duplicated nested
match over task_handle.join_handle.await causing a compile error; remove the
outer match that binds Ok(task_result) and keep the inner match that handles
Ok(Ok(())), Ok(Err(_)), and Err(err) for the JoinHandle<Result<()>> result.
Iterate over self.task_handles (cancel tokens loop stays) and then consume
task_handles in the second loop, matching directly on
task_handle.join_handle.await inside shutdown_and_join so task_id and self.id
logging remain unchanged.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
47-69: 🧹 Nitpick | 🔵 Trivial
MAX_WAIT_TIME_SECduplicated as defense-in-depth is acceptable but could reference the shared constant.The local
MAX_WAIT_TIME_SECon line 49 duplicates the 20-second bound already enforced byValidatedSqsListenerConfig. As defense-in-depth this is reasonable, but if you extract the validation bounds into constants (as suggested in the config file review), this could reference the same constant to stay in sync.
🤖 Fix all issues with AI agents
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 156-159: Update the doc comment for the function that currently
mentions Error::InvalidNumConcurrentListenerTasks to instead reference the
actual error variants returned by validation: Error::InvalidConfig(ConfigError)
and Error::InvalidSqsWaitTime; ensure the sentence that lists forwarded failures
from Self::create_s3_ingestion_job includes both of these validation errors and
clearly states they come from config/validation checks so readers can locate the
real variants (e.g., mention Error::InvalidConfig(ConfigError) and
Error::InvalidSqsWaitTime alongside the forwarded create_s3_ingestion_job
errors).
In `@components/log-ingestor/src/routes.rs`:
- Around line 211-212: The API description string for the SQS listener error
currently only references invalid concurrent listener tasks but omits the
wait_time_sec validation; update the description used in routes.rs (the
description string that references ConfigError) to either mention both
ConfigError::InvalidNumConcurrentListenerTasks and
ConfigError::InvalidSqsWaitTime explicitly or replace it with a more general
phrase such as "invalid configuration (e.g., invalid number of concurrent
listener tasks or invalid wait_time_sec)". Ensure the new text clearly signals
both possible validation failures so the documented error matches the
ConfigError variants.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)
146-186:⚠️ Potential issue | 🟡 MinorDoc comment for
create_sqs_listener_jobis incomplete —CustomEndpointUrlNotSupportedis not listed.The
# Errorssection (lines 154–157) documents forwarded errors fromcreate_s3_ingestion_jobandvalidate_and_create, but omitsError::CustomEndpointUrlNotSupportedwhich is explicitly returned at line 165. While the omission may predate this PR, the doc was partially rewritten here (line 157), so it's a good time to complete it.📝 Suggested doc fix
/// # Errors /// /// Returns an error if: /// +/// * [`Error::CustomEndpointUrlNotSupported`] if a custom endpoint URL is given. /// * Forwards [`Self::create_s3_ingestion_job`]'s return values on failure. /// * Forwards [`ValidatedSqsListenerConfig::validate_and_create`]'s return values on failure.
Description
This PR adds support for spawning multiple coroutines to process a single SQS queue. This enables higher message-processing throughput.
To adapt the multi-listener design, this PR also exposes the max waiting time as a configurable option so that coroutines can avoid frequent void responses.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Bug Fixes / Behaviour
Tests
Chores