Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,17 @@ impl ClusterSandbox {
Self::start_cluster_with_configs(temp_dir, node_configs).await
}

pub async fn start_standalone_with_otlp_service() -> anyhow::Result<Self> {
pub async fn start_cluster_with_otlp_service(
nodes_services: &[HashSet<QuickwitService>],
) -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir()?;
let services = QuickwitService::supported_services();
let mut node_configs = build_node_configs(temp_dir.path().to_path_buf(), &[services]);
node_configs[0]
.node_config
.indexer_config
.enable_otlp_endpoint = true;
let mut node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services);
// Set OTLP endpoint for indexers.
for node_config in node_configs.iter_mut() {
if node_config.services.contains(&QuickwitService::Indexer) {
node_config.node_config.indexer_config.enable_otlp_endpoint = true;
}
}
Self::start_cluster_with_configs(temp_dir, node_configs).await
}

Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-integration-tests/src/tests/index_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,14 @@ async fn test_shutdown() {
#[tokio::test]
async fn test_ingest_traces_with_otlp_grpc_api() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_with_otlp_service()
let nodes_services = vec![
HashSet::from_iter([QuickwitService::Searcher]),
HashSet::from_iter([QuickwitService::Metastore]),
HashSet::from_iter([QuickwitService::Indexer]),
HashSet::from_iter([QuickwitService::ControlPlane]),
HashSet::from_iter([QuickwitService::Janitor]),
];
let sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services)
.await
.unwrap();
// Wait fo the pipelines to start (one for logs and one for traces)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(crate) async fn start_grpc_server(
};
let otlp_log_grpc_service =
if let Some(otlp_logs_service) = services.otlp_logs_service_opt.clone() {
enabled_grpc_services.insert("otlp-log");
let logs_service = LogsServiceServer::new(otlp_logs_service)
.accept_compressed(CompressionEncoding::Gzip);
Some(logs_service)
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,15 @@ pub async fn serve_quickwit(
None
};

let otlp_logs_service_opt = if node_config.is_service_enabled(QuickwitService::Searcher)
let otlp_logs_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer)
&& node_config.indexer_config.enable_otlp_endpoint
{
Some(OtlpGrpcLogsService::new(ingest_service.clone()))
} else {
None
};

let otlp_traces_service_opt = if node_config.is_service_enabled(QuickwitService::Searcher)
let otlp_traces_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer)
&& node_config.indexer_config.enable_otlp_endpoint
{
Some(OtlpGrpcTracesService::new(ingest_service.clone(), None))
Expand Down