Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ QUESTIONS.txt
# These are backup files generated by rustfmt
**/*.rs.bk

.env
.idea
.vscode
.vscode-license
deps
qwdata
elastic-search-artifacts
.env
qwdata

# Generated by prost/tonic build
*_descriptor.bin
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ tokio-tungstenite,https://github.com/snapview/tokio-tungstenite,MIT,"Daniel Abra
toml,https://github.com/toml-rs/toml,MIT OR Apache-2.0,Alex Crichton <alex@alexcrichton.com>
toml_edit,https://github.com/toml-rs/toml,MIT OR Apache-2.0,"Andronik Ordian <write@reusable.software>, Ed Page <eopage@gmail.com>"
tonic,https://github.com/hyperium/tonic,MIT,Lucio Franco <luciofranco14@gmail.com>
tonic-reflection,https://github.com/hyperium/tonic,MIT,"James Nugent <james@jen20.com>, Samani G. Gikandi <samani@gojulas.com>"
tower,https://github.com/tower-rs/tower,MIT,Tower Maintainers <team@tower-rs.com>
tower-http,https://github.com/tower-rs/tower-http,MIT,Tower Maintainers <team@tower-rs.com>
tracing,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman <eliza@buoyant.io>, Tokio Contributors <team@tokio.rs>"
Expand Down
14 changes: 14 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ tokio-util = { version = "0.7", features = ["full"] }
toml = "0.7.6"
tonic = { version = "0.9.0", features = ["gzip"] }
tonic-build = "0.9.0"
tonic-reflection = "0.9"
tower = { version = "0.4.13", features = [
"balance",
"buffer",
Expand Down
23 changes: 18 additions & 5 deletions quickwit/quickwit-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// services.
//
// Cluster service.
let mut prost_config = prost_build::Config::default();
prost_config.file_descriptor_set_path("src/codegen/quickwit/cluster_descriptor.bin");

Codegen::builder()
.with_prost_config(prost_config)
.with_protos(&["protos/quickwit/cluster.proto"])
.with_output_dir("src/codegen/quickwit")
.with_result_type_path("crate::cluster::ClusterResult")
Expand All @@ -33,6 +37,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// Control plane.
let mut prost_config = prost_build::Config::default();
prost_config.file_descriptor_set_path("src/codegen/quickwit/control_plane_descriptor.bin");

prost_config
.extern_path(
".quickwit.common.DocMappingUid",
Expand All @@ -52,7 +58,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// Developer service.
let mut prost_config = prost_build::Config::default();
prost_config.bytes(["GetDebugInfoResponse.debug_info_json"]);
prost_config
.bytes(["GetDebugInfoResponse.debug_info_json"])
.file_descriptor_set_path("src/codegen/quickwit/developer_descriptor.bin");

Codegen::builder()
.with_prost_config(prost_config)
Expand All @@ -72,7 +80,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"crate::types::PipelineUid",
)
.extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid")
.extern_path(".quickwit.ingest.ShardId", "crate::types::ShardId");
.extern_path(".quickwit.ingest.ShardId", "crate::types::ShardId")
.file_descriptor_set_path("src/codegen/quickwit/indexing_descriptor.bin");

Codegen::builder()
.with_prost_config(prost_config)
Expand Down Expand Up @@ -107,7 +116,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.field_attribute(
"DeleteQuery.end_timestamp",
"#[serde(skip_serializing_if = \"Option::is_none\")]",
);
)
.file_descriptor_set_path("src/codegen/quickwit/metastore_descriptor.bin");

Codegen::builder()
.with_prost_config(prost_config)
Expand Down Expand Up @@ -157,7 +167,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.field_attribute(
"Shard.update_timestamp",
"#[serde(default = \"super::compatibility_shard_update_timestamp\")]",
);
)
.file_descriptor_set_path("src/codegen/quickwit/ingest_descriptor.bin");

Codegen::builder()
.with_prost_config(prost_config)
Expand All @@ -175,7 +186,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// Search service.
let mut prost_config = prost_build::Config::default();
prost_config.protoc_arg("--experimental_allow_proto3_optional");
prost_config
.file_descriptor_set_path("src/codegen/quickwit/search_descriptor.bin")
.protoc_arg("--experimental_allow_proto3_optional");

tonic_build::configure()
.enum_attribute(".", "#[serde(rename_all=\"snake_case\")]")
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use crate::GrpcServiceError;

include!("../codegen/quickwit/quickwit.cluster.rs");

pub const CLUSTER_PLANE_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/cluster_descriptor.bin");

pub type ClusterResult<T> = std::result::Result<T, ClusterError>;

#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/src/control_plane/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use crate::{GrpcServiceError, ServiceError, ServiceErrorCode};

include!("../codegen/quickwit/quickwit.control_plane.rs");

pub const CONTROL_PLANE_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/control_plane_descriptor.bin");

pub type ControlPlaneResult<T> = std::result::Result<T, ControlPlaneError>;

#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/src/developer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use crate::{GrpcServiceError, ServiceError, ServiceErrorCode};

include!("../codegen/quickwit/quickwit.developer.rs");

pub const DEVELOPER_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/developer_descriptor.bin");

pub type DeveloperResult<T> = std::result::Result<T, DeveloperError>;

#[derive(Debug, thiserror::Error, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use crate::{GrpcServiceError, ServiceError, ServiceErrorCode};

include!("../codegen/quickwit/quickwit.indexing.rs");

pub const INDEXING_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/indexing_descriptor.bin");

pub type IndexingResult<T> = std::result::Result<T, IndexingError>;

#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub mod ingester;
pub mod router;

include!("../codegen/quickwit/quickwit.ingest.rs");

pub const INGEST_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/ingest_descriptor.bin");

pub type IngestV2Result<T> = std::result::Result<T, IngestV2Error>;

#[derive(Debug, Copy, Clone, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub mod events;

include!("../codegen/quickwit/quickwit.metastore.rs");

pub const METASTORE_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/metastore_descriptor.bin");

pub type MetastoreResult<T> = Result<T, MetastoreError>;

/// Lists the object types stored and managed by the metastore.
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/src/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub use sort_by_value::SortValue;

include!("../codegen/quickwit/quickwit.search.rs");

pub const SEARCH_FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!("../codegen/quickwit/search_descriptor.bin");

impl SearchRequest {
pub fn time_range(&self) -> impl std::ops::RangeBounds<i64> {
use std::ops::Bound;
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio = { workspace = true }
tokio-rustls = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic-reflection = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
Expand Down
31 changes: 31 additions & 0 deletions quickwit/quickwit-serve/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::BTreeSet;
use std::error::Error;
use std::sync::Arc;

use anyhow::Context;
use quickwit_cluster::cluster_grpc_server;
use quickwit_common::tower::BoxFutureInfaillible;
use quickwit_config::service::QuickwitService;
Expand All @@ -30,6 +31,7 @@ use quickwit_proto::tonic::codegen::CompressionEncoding;
use quickwit_proto::tonic::transport::server::TcpIncoming;
use quickwit_proto::tonic::transport::{Certificate, Identity, Server, ServerTlsConfig};
use tokio::net::TcpListener;
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
use tracing::*;

use crate::developer_api::DeveloperApiServer;
Expand All @@ -45,7 +47,9 @@ pub(crate) async fn start_grpc_server(
shutdown_signal: BoxFutureInfaillible<()>,
) -> anyhow::Result<()> {
let mut enabled_grpc_services = BTreeSet::new();
let mut file_descriptor_sets = Vec::new();
let mut server = Server::builder();

if let Some(tls_config) = grpc_config.tls {
let cert = std::fs::read_to_string(tls_config.cert_path)?;
let key = std::fs::read_to_string(tls_config.key_path)?;
Expand All @@ -66,10 +70,13 @@ pub(crate) async fn start_grpc_server(
}

let cluster_grpc_service = cluster_grpc_server(services.cluster.clone());
file_descriptor_sets.push(quickwit_proto::cluster::CLUSTER_PLANE_FILE_DESCRIPTOR_SET);

// Mount gRPC metastore service if `QuickwitService::Metastore` is enabled on node.
let metastore_grpc_service = if let Some(metastore_server) = &services.metastore_server_opt {
enabled_grpc_services.insert("metastore");
file_descriptor_sets.push(quickwit_proto::metastore::METASTORE_FILE_DESCRIPTOR_SET);

Some(metastore_server.as_grpc_service(grpc_config.max_message_size))
} else {
None
Expand All @@ -81,6 +88,8 @@ pub(crate) async fn start_grpc_server(
{
if let Some(indexing_service) = services.indexing_service_opt.clone() {
enabled_grpc_services.insert("indexing");
file_descriptor_sets.push(quickwit_proto::indexing::INDEXING_FILE_DESCRIPTOR_SET);

let indexing_service = IndexingServiceClient::tower()
.stack_layer(INDEXING_GRPC_SERVER_METRICS_LAYER.clone())
.build_from_mailbox(indexing_service);
Expand Down Expand Up @@ -121,6 +130,7 @@ pub(crate) async fn start_grpc_server(

let ingester_grpc_service = if let Some(ingester_service) = services.ingester_service() {
enabled_grpc_services.insert("ingester");
file_descriptor_sets.push(quickwit_proto::ingest::INGEST_FILE_DESCRIPTOR_SET);
Some(ingester_service.as_grpc_service(grpc_config.max_message_size))
} else {
None
Expand All @@ -132,6 +142,8 @@ pub(crate) async fn start_grpc_server(
.is_service_enabled(QuickwitService::ControlPlane)
{
enabled_grpc_services.insert("control-plane");
file_descriptor_sets.push(quickwit_proto::control_plane::CONTROL_PLANE_FILE_DESCRIPTOR_SET);

Some(
services
.control_plane_client
Expand Down Expand Up @@ -165,6 +177,8 @@ pub(crate) async fn start_grpc_server(
.is_service_enabled(QuickwitService::Searcher)
{
enabled_grpc_services.insert("search");
file_descriptor_sets.push(quickwit_proto::search::SEARCH_FILE_DESCRIPTOR_SET);

let search_service = services.search_service.clone();
let grpc_search_service = GrpcSearchAdapter::from(search_service);
Some(
Expand All @@ -185,15 +199,19 @@ pub(crate) async fn start_grpc_server(
};
let developer_grpc_service = {
enabled_grpc_services.insert("developer");
file_descriptor_sets.push(quickwit_proto::developer::DEVELOPER_FILE_DESCRIPTOR_SET);

let developer_service = DeveloperApiServer::from_services(&services);

DeveloperServiceClient::new(developer_service)
.as_grpc_service(DeveloperApiServer::MAX_GRPC_MESSAGE_SIZE)
};
let reflection_service = build_reflection_service(&file_descriptor_sets)?;

let server_router = server
.add_service(cluster_grpc_service)
.add_service(developer_grpc_service)
.add_service(reflection_service)
.add_optional_service(control_plane_grpc_service)
.add_optional_service(indexing_grpc_service)
.add_optional_service(ingest_api_grpc_service)
Expand All @@ -219,3 +237,16 @@ pub(crate) async fn start_grpc_server(
serve_res?;
Ok(())
}

fn build_reflection_service(
file_descriptor_sets: &[&[u8]],
) -> anyhow::Result<ServerReflectionServer<impl ServerReflection>> {
let mut builder = tonic_reflection::server::Builder::configure();

for file_descriptor_set in file_descriptor_sets {
builder = builder.register_encoded_file_descriptor_set(file_descriptor_set)
}
builder
.build()
.context("failed to build reflection service")
}