Skip to content

Commit

Permalink
Optimize and batch merge pipelines list splits queries
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 13, 2024
1 parent 7fb3aa5 commit 6d4e9b4
Show file tree
Hide file tree
Showing 72 changed files with 931 additions and 529 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ docker-compose-up:
COMPOSE_PROFILES=$(DOCKER_SERVICES) docker compose -f docker-compose.yml up -d --remove-orphans --wait

docker-compose-down:
docker compose -f docker-compose.yml down --remove-orphans
docker compose -p quickwit down --remove-orphans

docker-compose-logs:
docker compose logs -f -t
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# first if they are already tagged latest and volumes if their content is
# incompatible with the latest version, as in case of postgres.

name: quickwit

networks:
default:
name: quickwit-network
Expand Down
15 changes: 8 additions & 7 deletions quickwit/quickwit-cli/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_proto::types::IndexId;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
Expand Down Expand Up @@ -200,7 +201,7 @@ pub fn build_index_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct ClearIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub assume_yes: bool,
}

Expand All @@ -215,13 +216,13 @@ pub struct CreateIndexArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct DescribeIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct IngestDocsArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub input_path_opt: Option<PathBuf>,
pub batch_size_limit_opt: Option<ByteSize>,
pub commit_type: CommitType,
Expand All @@ -230,7 +231,7 @@ pub struct IngestDocsArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct SearchIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub query: String,
pub aggregation: Option<String>,
pub max_hits: usize,
Expand All @@ -245,7 +246,7 @@ pub struct SearchIndexArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct DeleteIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub dry_run: bool,
pub assume_yes: bool,
}
Expand Down Expand Up @@ -528,7 +529,7 @@ where I: IntoIterator<Item = IndexConfig> {
#[derive(Tabled)]
struct IndexRow {
#[tabled(rename = "Index ID")]
index_id: String,
index_id: IndexId,
#[tabled(rename = "Index URI")]
index_uri: Uri,
}
Expand All @@ -548,7 +549,7 @@ pub async fn describe_index_cli(args: DescribeIndexArgs) -> anyhow::Result<()> {
}

pub struct IndexStats {
pub index_id: String,
pub index_id: IndexId,
pub index_uri: Uri,
pub num_published_splits: usize,
pub size_published_splits: ByteSize,
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/index/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use colored::Colorize;
use quickwit_config::{RetentionPolicy, SearchSettings};
use quickwit_proto::types::IndexId;
use quickwit_serve::IndexUpdates;
use tracing::debug;

Expand Down Expand Up @@ -65,7 +66,7 @@ pub fn build_index_update_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct RetentionPolicyArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub disable: bool,
pub period: Option<String>,
pub schedule: Option<String>,
Expand All @@ -74,7 +75,7 @@ pub struct RetentionPolicyArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct SearchSettingsArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub default_search_fields: Vec<String>,
}

Expand Down
23 changes: 12 additions & 11 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_config::{validate_identifier, ConfigFormat, SourceConfig};
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_proto::types::{IndexId, SourceId};
use quickwit_storage::{load_file, StorageResolver};
use serde_json::Value as JsonValue;
use tabled::{Table, Tabled};
Expand Down Expand Up @@ -142,44 +143,44 @@ pub fn build_source_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct CreateSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub source_config_uri: Uri,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ToggleSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
pub enable: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DeleteSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
pub assume_yes: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DescribeSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ListSourcesArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ResetCheckpointArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
pub assume_yes: bool,
}

Expand Down Expand Up @@ -469,7 +470,7 @@ where I: IntoIterator<Item = SourceConfig> {
#[derive(Tabled)]
struct SourceRow {
#[tabled(rename = "ID")]
source_id: String,
source_id: SourceId,
#[tabled(rename = "Type")]
source_type: String,
#[tabled(rename = "Enabled")]
Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-cli/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use clap::{arg, ArgMatches, Command};
use colored::Colorize;
use itertools::Itertools;
use quickwit_metastore::{Split, SplitState};
use quickwit_proto::types::{IndexId, SplitId};
use quickwit_serve::ListSplitsQueryParams;
use tabled::{Table, Tabled};
use time::{format_description, Date, OffsetDateTime, PrimitiveDateTime};
Expand Down Expand Up @@ -133,7 +134,7 @@ impl FromStr for OutputFormat {
#[derive(Debug, PartialEq)]
pub struct ListSplitArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub offset: Option<usize>,
pub limit: Option<usize>,
pub split_states: Option<Vec<SplitState>>,
Expand All @@ -147,16 +148,16 @@ pub struct ListSplitArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct MarkForDeletionArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub split_ids: Vec<String>,
pub assume_yes: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DescribeSplitArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub split_id: String,
pub index_id: IndexId,
pub split_id: SplitId,
pub verbose: bool,
}

Expand Down Expand Up @@ -470,7 +471,7 @@ fn parse_split_state(split_state_arg: &str) -> anyhow::Result<SplitState> {
#[derive(Tabled)]
struct SplitRow {
#[tabled(rename = "ID")]
split_id: String,
split_id: SplitId,
#[tabled(rename = "State")]
split_state: SplitState,
#[tabled(rename = "Num docs")]
Expand Down
27 changes: 12 additions & 15 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use quickwit_config::{
VecSourceParams, CLI_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{
IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService,
};
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
Expand All @@ -52,7 +50,7 @@ use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::{NodeId, PipelineUid};
use quickwit_proto::types::{IndexId, PipelineUid, SourceId, SplitId};
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
Expand Down Expand Up @@ -174,7 +172,7 @@ pub fn build_tool_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct LocalIngestDocsArgs {
pub config_uri: Uri,
pub index_id: String,
pub index_id: IndexId,
pub input_path_opt: Option<PathBuf>,
pub input_format: SourceInputFormat,
pub overwrite: bool,
Expand All @@ -185,7 +183,7 @@ pub struct LocalIngestDocsArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct LocalSearchArgs {
pub config_uri: Uri,
pub index_id: String,
pub index_id: IndexId,
pub query: String,
pub aggregation: Option<String>,
pub max_hits: usize,
Expand All @@ -200,23 +198,23 @@ pub struct LocalSearchArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct GarbageCollectIndexArgs {
pub config_uri: Uri,
pub index_id: String,
pub index_id: IndexId,
pub grace_period: Duration,
pub dry_run: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct MergeArgs {
pub config_uri: Uri,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ExtractSplitArgs {
pub config_uri: Uri,
pub index_id: String,
pub split_id: String,
pub index_id: IndexId,
pub split_id: SplitId,
pub target_dir: PathBuf,
}

Expand Down Expand Up @@ -479,7 +477,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.await?;
let merge_pipeline_handle = indexing_server_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: MergePipelineId::from(&pipeline_id),
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;
let indexing_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -618,7 +616,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: MergePipelineId::from(&pipeline_id),
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;

Expand Down Expand Up @@ -931,9 +929,8 @@ impl ThroughputCalculator {
}

async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
let node_id: NodeId = config.node_id.clone().into();
let self_node = ClusterMember {
node_id,
node_id: config.node_id.clone(),
generation_id: quickwit_cluster::GenerationId::now(),
is_ready: false,
enabled_services: HashSet::new(),
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use quickwit_common::uri::Uri;
use quickwit_config::service::QuickwitService;
use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, MetastoreResolver};
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::IndexId;
use quickwit_storage::{Storage, StorageResolver};
use reqwest::Url;
use tempfile::{tempdir, TempDir};
Expand Down Expand Up @@ -120,7 +121,7 @@ pub struct TestEnv {
pub cluster_endpoint: Url,
pub index_config_uri: Uri,
/// The index ID.
pub index_id: String,
pub index_id: IndexId,
pub index_uri: Uri,
pub rest_listen_port: u16,
pub storage_resolver: StorageResolver,
Expand Down Expand Up @@ -177,7 +178,7 @@ pub enum TestStorageType {

/// Creates all necessary artifacts in a test environment.
pub async fn create_test_env(
index_id: String,
index_id: IndexId,
storage_type: TestStorageType,
) -> anyhow::Result<TestEnv> {
let temp_dir = tempdir()?;
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use quickwit_common::metrics::IntCounter;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;

#[cfg(any(test, feature = "testsuite"))]
Expand Down Expand Up @@ -129,7 +128,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
let peer_seed_addrs = node_config.peer_seed_addrs().await?;
let indexing_tasks = Vec::new();

let node_id: NodeId = node_config.node_id.clone().into();
let node_id = node_config.node_id.clone();
let generation_id = GenerationId::now();
let is_ready = false;
let indexing_cpu_capacity = if node_config.is_service_enabled(QuickwitService::Indexer) {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use anyhow::Context;
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
use tracing::info;

Expand Down Expand Up @@ -127,7 +128,7 @@ impl TryFrom<VersionedIndexConfig> for IndexConfig {
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct IndexConfigV0_8 {
pub index_id: String,
pub index_id: IndexId,
#[schema(value_type = String)]
#[serde(default)]
pub index_uri: Option<Uri>,
Expand Down

0 comments on commit 6d4e9b4

Please sign in to comment.