Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Queue source with SQS implementation #5148

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open

Add Queue source with SQS implementation #5148

wants to merge 35 commits into from

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Jun 21, 2024

Description

This PR proposes the generic implementation of a "queue" source. For now, only an implementation for AWS SQS with its data backed by AWS S3 is exposed to the users. Google Pubsub as the queue implementation or inlined data (i.e messages containing the data itself and not the link to the object store) will come next.

We use the shard API to provide deduplication of messages. For the current implementation where the source data is stored on S3, the deduplication is made on the object URI.

High level summary of the abstractions that are part of the generic implementation:

  • Processor exposes the exact same methods as the Source trait but does not implement it directly. Instead, the concrete queue sources (e.g. SqsSource) wrap the Processor.
  • A pipeline of message states:
    • RawMessage: the message as received from the Queue
    • PreProcessedPayload: the message went through the minimal transformation to discover its partition id
    • CheckpointedMessage: the message was checked against the shared state (shard API), it is now ready to be processed
    • InProgressMessage: the message that is actively being read
  • QueueSharedState is an abstraction over shard API. By calling open_shard upon reception of the messages we avoid costly redundant processing when receiving a duplicate message.
  • QueueLocalState represents the state machine of the messages as they are processed by the indexing pipeline
  • VisibilityTaskHandle a task that extends the message visibility when required (needs to be reworked)

TODO:

  • Generic implementation of the queue source based on the Processor abstraction
  • SQS implementation of the Queue trait
  • Adapt the shard open_shard API to accept publish_token as a field. This gives upsert semantics to the API which makes it possible to acquire the shard upon creation.
  • Change the switching logic in the metastore implementation to allow any source to use the shard API for checkpointing (adding SourceConfig.use_shard_api())
  • Unit tests the Processor abstraction
  • Unit test the ShardState abstraction
  • LocalStack tests for the SqsSource (with some small refactoring to reuse the setup_index helper from the KafkaSource)
  • Implement the observable state updates
  • Change the source config to be a file source with SQS notification instead of an SQS source backed by a file.
  • Error handling (transient can be retried, bad messages should be discarded(?)...) (see comment)
  • Deduplicate within a received message batch
  • Resume the indexing of an unfinished file after a failure
  • Use 2x commit timeout as last visibility extension after InProgress status
  • End to end test the validate the queue behaviour after commiting the shard progress in the Publisher actor

TODO in subsequent PRs:

  • GCP Pubsub (small)
  • data within the queue payload (small)
  • shard garbage collection (medium)
  • improve the visibility extension logic (medium)

How was this PR tested?

This PR contains unit tests and higher level tests that use LocalStack.

@rdettai rdettai self-assigned this Jun 21, 2024
@rdettai rdettai changed the base branch from main to uri-file-src-params June 21, 2024 13:45
Copy link

github-actions bot commented Jun 21, 2024

On SSD:

Average search latency is 1.01x that of the reference (lower is better).
Ref run id: 2337, ref commit: 4ade7b5
Link

On GCS:

Average search latency is 0.981x that of the reference (lower is better).
Ref run id: 2339, ref commit: 4ade7b5
Link

@rdettai rdettai requested a review from guilload June 21, 2024 15:19
@rdettai rdettai force-pushed the sqs branch 4 times, most recently from 93cce59 to 056f785 Compare June 24, 2024 14:01
@fulmicoton
Copy link
Contributor

we need a different handling of transient vs non-transient error.
e.g.
in the message parsing -> non-transient
disconnection while streaming file -> transient...
gzip corruption -> non-transient.

) -> anyhow::Result<BatchBuilder> {
let mut batch_builder = BatchBuilder::new(source_type);
while batch_builder.num_bytes < BATCH_NUM_BYTES_LIMIT {
let mut buf = String::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why allocate at every single loop iteration?

Copy link
Contributor Author

@rdettai rdettai Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from main

let mut doc_line = String::new();

I guess we could allocate one big memory region of size BATCH_NUM_BYTES_LIMIT, load data into it and slice it by row. I think it's a bit far from this PR's concern though. Do you think these allocations are costly enough for opening an issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FileSourceParams validation mess
2 participants