Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions config/tutorials/gh-archive/kafka-source.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
version: 0.7
version: 0.8
source_id: kafka-source
source_type: kafka
max_num_pipelines_per_indexer: 1
desired_num_pipelines: 2
num_pipelines: 2
params:
topic: gh-archive
client_params:
Expand Down
40 changes: 7 additions & 33 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ Short max poll interval durations may cause a source to crash when back pressure

```bash
cat << EOF > source-config.yaml
version: 0.7
version: 0.8
source_id: my-kafka-source
source_type: kafka
max_num_pipelines_per_indexer: 1
desired_num_pipelines: 2
num_pipelines: 2
params:
topic: my-topic
client_params:
Expand Down Expand Up @@ -164,38 +163,13 @@ EOF
./quickwit source create --index my-index --source-config source-config.yaml
```

## Maximum number of pipelines per indexer

The `max_num_pipelines_per_indexer` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar(coming soon).

The maximum number of indexing pipelines defines the limit of pipelines spawned for the source on a given indexer.
This maximum can be reached only if there are enough `desired_num_pipelines` to run.

:::note

With the following parameters, only one pipeline will run on one indexer.

- `max_num_pipelines_per_indexer=2`
- `desired_num_pipelines=1`

:::

## Desired number of pipelines

`desired_num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon).

The desired number of indexing pipelines defines the number of pipelines to run on a cluster for the source. It is a "desired"
number as it cannot be reach it there is not enough indexers in
the cluster.

:::note

With the following parameters, only one pipeline will start on the sole indexer.
## Number of pipelines

- `max_num_pipelines_per_indexer=1`
- `desired_num_pipelines=2`
`num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon).

:::
It defines the number of pipelines to run on a cluster for the source. The actual placement of these pipelines on the different indexer
will be decided by the control plane. Note that distributions of a source like Kafka is done by assigning a set of partitions to different pipelines.
As a result, it is recommended to make sure the number of partitions is a multiple of the number of `num_pipelines`.

## Transform parameters

Expand Down
5 changes: 2 additions & 3 deletions docs/ingest-data/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ This tutorial assumes that the Kafka cluster is available locally on the default
#
# Kafka source config file.
#
version: 0.7
version: 0.8
source_id: kafka-source
source_type: kafka
max_num_pipelines_per_indexer: 1
desired_num_pipelines: 2
num_pipelines: 2
params:
topic: gh-archive
client_params:
Expand Down
9 changes: 3 additions & 6 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,7 @@ mod tests {
.collect();
let sources = vec![SourceConfig {
source_id: "foo-source".to_string(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file("path/to/file"),
transform_config: None,
Expand Down Expand Up @@ -802,17 +801,15 @@ mod tests {
let sources = [
SourceConfig {
source_id: "foo-source".to_string(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::stdin(),
transform_config: None,
input_format: SourceInputFormat::Json,
},
SourceConfig {
source_id: "bar-source".to_string(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::stdin(),
transform_config: None,
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.map(|vrl_script| TransformConfig::new(vrl_script, None));
let source_config = SourceConfig {
source_id: CLI_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."),
num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."),
enabled: true,
source_params,
transform_config,
Expand Down Expand Up @@ -608,8 +607,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
index_id: args.index_id,
source_config: SourceConfig {
source_id: args.source_id,
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::Vec(VecSourceParams::default()),
transform_config: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"version": "0.7",
"source_id": "hdfs-logs-kafka-source",
"desired_num_pipelines": 2,
"max_num_pipelines_per_indexer": 2,
"source_type": "kafka",
"params": {
"topic": "cloudera-cluster-logs",
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub use crate::node_config::{
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig};
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig,
S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs,
Expand All @@ -88,6 +88,7 @@ pub use crate::storage_config::{
DocMapping,
VersionedSourceConfig,
SourceConfigV0_7,
SourceConfigV0_8,
VersionedIndexConfig,
IndexConfigV0_7,
VersionedIndexTemplate,
Expand Down
Loading