Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use source_config::{
load_source_config_from_user_config, FileSourceParams, GcpPubSubSourceParams,
KafkaSourceParams, KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint,
SourceConfig, SourceInputFormat, SourceParams, TransformConfig, VecSourceParams,
VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID,
VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand All @@ -64,8 +64,8 @@ pub use crate::metastore_config::{
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
};
pub use crate::node_config::{
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
DEFAULT_QW_CONFIG_PATH,
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig};
pub use crate::storage_config::{
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::Duration;
use anyhow::{bail, ensure};
use bytesize::ByteSize;
use http::HeaderMap;
use once_cell::sync::Lazy;
use quickwit_common::net::HostAddr;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
Expand Down Expand Up @@ -212,6 +213,12 @@ impl Default for IngestApiConfig {
}
}

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
*ENABLE_INGEST_V2
}

impl IngestApiConfig {
pub fn replication_factor(&self) -> anyhow::Result<NonZeroUsize> {
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {
Expand Down
15 changes: 9 additions & 6 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::TestableForRegression;
use crate::{enable_ingest_v2, TestableForRegression};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_INGEST_SOURCE_ID: &str = "_ingest-cli-source";
Expand All @@ -44,10 +44,13 @@ pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source";

/// Reserved source ID used for native Quickwit ingest.
/// (this is for ingest v2)
pub const INGEST_SOURCE_ID: &str = "_ingest-source";
pub const INGEST_V2_SOURCE_ID: &str = "_ingest-source";

pub const RESERVED_SOURCE_IDS: &[&str] =
&[CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID];
pub const RESERVED_SOURCE_IDS: &[&str] = &[
CLI_INGEST_SOURCE_ID,
INGEST_API_SOURCE_ID,
INGEST_V2_SOURCE_ID,
];

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(into = "VersionedSourceConfig")]
Expand Down Expand Up @@ -125,10 +128,10 @@ impl SourceConfig {
/// Creates an ingest source v2.
pub fn ingest_v2_default() -> Self {
Self {
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: false,
enabled: enable_ingest_v2(),
source_params: SourceParams::Ingest,
transform_config: None,
input_format: SourceInputFormat::Json,
Expand Down
34 changes: 17 additions & 17 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ impl EventSubscriber<ShardPositionsUpdate> for ControlPlaneEventSubscriber {
mod tests {
use mockall::Sequence;
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
use quickwit_config::{IndexConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_config::{IndexConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
Expand Down Expand Up @@ -893,14 +893,14 @@ mod tests {

let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);

let subresponses = vec![ListShardsSubresponse {
index_uid: "test-index:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: "test-index:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 1,
shard_state: ShardState::Open as i32,
..Default::default()
Expand All @@ -925,7 +925,7 @@ mod tests {
subrequests: vec![GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
}],
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
Expand All @@ -939,7 +939,7 @@ mod tests {

let subresponse = &get_open_shards_response.successes[0];
assert_eq!(subresponse.index_uid, "test-index:0");
assert_eq!(subresponse.source_id, INGEST_SOURCE_ID);
assert_eq!(subresponse.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subresponse.open_shards.len(), 1);
assert_eq!(subresponse.open_shards[0].shard_id, 1);

Expand Down Expand Up @@ -1111,15 +1111,15 @@ mod tests {
assert_eq!(delete_shards_request.subrequests.len(), 1);
let subrequest = &delete_shards_request.subrequests[0];
assert_eq!(subrequest.index_uid, index_uid_clone);
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(&subrequest.shard_ids[..], &[17]);
Ok(DeleteShardsResponse {})
},
);

let mut shard = Shard {
index_uid: index_0.index_uid.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 17,
leader_id: "test_node".to_string(),
..Default::default()
Expand All @@ -1132,7 +1132,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![shard],
next_shard_id: 18,
}],
Expand All @@ -1152,7 +1152,7 @@ mod tests {
);
let source_uid = SourceUid {
index_uid: index_0.index_uid.clone(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};

// This update should not triggeer anything in the control plane.
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod tests {
assert_eq!(delete_shards_request.subrequests.len(), 1);
let subrequest = &delete_shards_request.subrequests[0];
assert_eq!(subrequest.index_uid, index_uid_clone);
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(&subrequest.shard_ids[..], &[17]);
Ok(DeleteShardsResponse {})
},
Expand All @@ -1258,7 +1258,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![],
next_shard_id: 18,
}],
Expand All @@ -1278,7 +1278,7 @@ mod tests {
);
let source_uid = SourceUid {
index_uid: index_0.index_uid.clone(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};

// This update should not triggeer anything in the control plane.
Expand Down Expand Up @@ -1332,7 +1332,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
Expand Down Expand Up @@ -1428,7 +1428,7 @@ mod tests {
mock_metastore.expect_delete_source().return_once(
move |delete_source_request: DeleteSourceRequest| {
assert_eq!(delete_source_request.index_uid, index_uid_clone.to_string());
assert_eq!(&delete_source_request.source_id, INGEST_SOURCE_ID);
assert_eq!(&delete_source_request.source_id, INGEST_V2_SOURCE_ID);
Ok(EmptyResponse {})
},
);
Expand All @@ -1454,7 +1454,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
Expand Down Expand Up @@ -1485,7 +1485,7 @@ mod tests {
control_plane_mailbox
.ask(DeleteSourceRequest {
index_uid: index_0.index_uid.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
})
.await
.unwrap()
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ mod tests {

use std::collections::BTreeSet;

use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{RateMibPerSec, ShardInfo};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
Expand Down Expand Up @@ -1344,7 +1344,7 @@ mod tests {
.returning(|request| {
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.subrequests[0].index_uid, "test-index:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.subrequests[0].leader_id, "test-ingester");
assert_eq!(request.subrequests[0].next_shard_id, 1);

Expand All @@ -1355,17 +1355,17 @@ mod tests {
mock_metastore.expect_open_shards().returning(|request| {
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.subrequests[0].index_uid, "test-index:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.subrequests[0].leader_id, "test-ingester");
assert_eq!(request.subrequests[0].next_shard_id, 1);

let subresponses = vec![metastore::OpenShardsSubresponse {
subrequest_id: 0,
index_uid: "test-index:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
opened_shards: vec![Shard {
index_uid: "test-index:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 1,
leader_id: "test-ingester".to_string(),
shard_state: ShardState::Open as i32,
Expand All @@ -1387,7 +1387,7 @@ mod tests {
);

let index_uid: IndexUid = "test-index:0".into();
let source_id: SourceId = INGEST_SOURCE_ID.to_string();
let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string();

let source_uid = SourceUid {
index_uid: index_uid.clone(),
Expand Down Expand Up @@ -1425,7 +1425,7 @@ mod tests {
.returning(|request| {
assert_eq!(request.shards.len(), 1);
assert_eq!(request.shards[0].index_uid, "test-index:0");
assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.shards[0].shard_id, 1);
assert_eq!(request.shards[0].leader_id, "test-ingester");

Expand All @@ -1434,7 +1434,7 @@ mod tests {
ingester_mock.expect_init_shards().returning(|request| {
assert_eq!(request.shards.len(), 1);
assert_eq!(request.shards[0].index_uid, "test-index:0");
assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.shards[0].shard_id, 1);
assert_eq!(request.shards[0].leader_id, "test-ingester");

Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl ControlPlaneModel {

#[cfg(test)]
mod tests {
use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::ListIndexesMetadataResponse;
Expand Down Expand Up @@ -381,14 +381,14 @@ mod tests {
assert_eq!(request.subrequests.len(), 2);

assert_eq!(request.subrequests[0].index_uid, "test-index-0:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(
request.subrequests[0].shard_state(),
ShardState::Unspecified
);

assert_eq!(request.subrequests[1].index_uid, "test-index-1:0");
assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[1].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(
request.subrequests[1].shard_state(),
ShardState::Unspecified
Expand All @@ -397,11 +397,11 @@ mod tests {
let subresponses = vec![
metastore::ListShardsSubresponse {
index_uid: "test-index-0:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
shard_id: 42,
index_uid: "test-index-0:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_state: ShardState::Open as i32,
leader_id: "node1".to_string(),
..Default::default()
Expand All @@ -410,7 +410,7 @@ mod tests {
},
metastore::ListShardsSubresponse {
index_uid: "test-index-1:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: Vec::new(),
next_shard_id: 1,
},
Expand Down Expand Up @@ -443,7 +443,7 @@ mod tests {

let source_uid_0 = SourceUid {
index_uid: "test-index-0:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};
let shards: Vec<&ShardEntry> = model
.shard_table
Expand All @@ -458,7 +458,7 @@ mod tests {

let source_uid_1 = SourceUid {
index_uid: "test-index-1:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};
let shards: Vec<&ShardEntry> = model
.shard_table
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = { workspace = true }
bytesize = { workspace = true }
dyn-clone = { workspace = true }
flume = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
Expand Down
Loading