Skip to content

Commit

Permalink
feat: improve super cluster (#3428)
Browse files Browse the repository at this point in the history
- [x] add timeout and partical success for super cluster
- [x] add region filter for super cluster
  • Loading branch information
hengfeiyang committed May 8, 2024
1 parent 5cf8a0c commit 32b7ce4
Show file tree
Hide file tree
Showing 26 changed files with 436 additions and 240 deletions.
540 changes: 328 additions & 212 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ codegen-units = 4

[dependencies]
actix-cors = "0.6"
actix-http = "3.6"
actix-multipart = { version = "0.6", features = ["derive"] }
actix-web.workspace = true
actix-web-httpauth = "0.8"
Expand Down Expand Up @@ -247,7 +248,7 @@ rand = "0.8"
rayon = "1.7.0"
regex = "1.7"
regex-syntax = "0.8"
reqwest = { version = "0.11", default-features = false, features = [
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"stream",
] }
Expand Down
2 changes: 2 additions & 0 deletions src/cli/data/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ impl Context for Export {
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
};

let req = search::Request {
query,
aggs: HashMap::new(),
encoding: search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
5 changes: 3 additions & 2 deletions src/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use lettre::{
AsyncSmtpTransport, Tokio1Executor,
};
use once_cell::sync::Lazy;
use reqwest::Client;
use sysinfo::{DiskExt, SystemExt};

use crate::{
Expand Down Expand Up @@ -165,7 +164,7 @@ pub static INSTANCE_ID: Lazy<RwHashMap<String, String>> = Lazy::new(Default::def

pub static TELEMETRY_CLIENT: Lazy<segment::HttpClient> = Lazy::new(|| {
segment::HttpClient::new(
Client::builder()
reqwest::Client::builder()
.connect_timeout(Duration::new(10, 0))
.build()
.unwrap(),
Expand Down Expand Up @@ -420,6 +419,8 @@ pub struct Grpc {
help = "Max grpc message size in MB, default is 16 MB"
)]
pub max_message_size: usize,
#[env_config(name = "ZO_GRPC_CONNECT_TIMEOUT", default = 5)] // in seconds
pub connect_timeout: u64,
}

#[derive(EnvConfig)]
Expand Down
14 changes: 14 additions & 0 deletions src/config/src/meta/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct Request {
#[serde(default)]
pub encoding: RequestEncoding,
#[serde(default)]
pub regions: Vec<String>, // default query all regions, local: only query local region clusters
#[serde(default)]
pub clusters: Vec<String>, // default query all clusters, local: only query local cluster
#[serde(default)]
pub timeout: i64,
Expand Down Expand Up @@ -117,6 +119,8 @@ pub struct Query {
pub query_fn: Option<String>,
#[serde(default)]
pub skip_wal: bool,
#[serde(default)]
pub is_partial: bool,
}

fn default_size() -> usize {
Expand All @@ -140,6 +144,7 @@ impl Default for Query {
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
}
}
}
Expand Down Expand Up @@ -203,6 +208,8 @@ pub struct Response {
pub trace_id: String,
#[serde(skip_serializing_if = "String::is_empty")]
pub function_error: String,
#[serde(default)]
pub is_partial: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default, ToSchema)]
Expand Down Expand Up @@ -240,6 +247,7 @@ impl Response {
response_type: "".to_string(),
trace_id: "".to_string(),
function_error: "".to_string(),
is_partial: false,
}
}

Expand Down Expand Up @@ -296,6 +304,10 @@ impl Response {
pub fn set_trace_id(&mut self, trace_id: String) {
self.trace_id = trace_id;
}

pub fn set_partial(&mut self, is_partial: bool) {
self.is_partial = is_partial;
}
}

#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
Expand Down Expand Up @@ -538,9 +550,11 @@ mod tests {
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: "base64".into(),
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
8 changes: 8 additions & 0 deletions src/handler/http/request/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,11 @@ pub async fn around(
uses_zo_fn: uses_fn,
query_fn: query_fn.clone(),
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down Expand Up @@ -565,9 +567,11 @@ pub async fn around(
uses_zo_fn: uses_fn,
query_fn: query_fn.clone(),
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down Expand Up @@ -937,9 +941,11 @@ async fn values_v1(
uses_zo_fn: uses_fn,
query_fn: query_fn.clone(),
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down Expand Up @@ -1165,9 +1171,11 @@ async fn values_v2(
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down
2 changes: 2 additions & 0 deletions src/handler/http/request/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,11 @@ pub async fn get_latest_traces(
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down
6 changes: 3 additions & 3 deletions src/handler/http/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ async fn proxy(
path: web::Path<PathParamProxyURL>,
req: HttpRequest,
) -> actix_web::Result<HttpResponse> {
let client = reqwest::Client::new();
let client = awc::Client::new();
let method = req.method().clone();
let forwarded_resp = client
let mut forwarded_resp = client
.request(method, &path.target_url)
.send()
.await
Expand All @@ -187,7 +187,7 @@ async fn proxy(
})?;

let status = forwarded_resp.status().as_u16();
let body = forwarded_resp.bytes().await.map_err(|e| {
let body = forwarded_resp.body().await.map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Failed to read the response: {}", e))
})?;

Expand Down
1 change: 1 addition & 0 deletions src/proto/proto/cluster/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message SearchResponse {
bytes hits = 6;
repeated SearchAggResponse aggs = 7;
ScanStats scan_stats = 8;
bool is_partial = 9;
}

message SearchAggRequest {
Expand Down
1 change: 1 addition & 0 deletions src/router/grpc/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) async fn get_ingester_channel() -> Result<Channel, tonic::Status> {
// cache miss, connect to ingester
let channel = Channel::from_shared(grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down
2 changes: 2 additions & 0 deletions src/service/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,11 @@ impl QueryCondition {
query_context: None,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/service/db/enrichment_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub async fn get(org_id: &str, name: &str) -> Result<Vec<vrl::value::Value>, any
query,
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/service/db/file_list/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ async fn send_to_node(
.expect("parse internal grpc token faile");
let channel = match Channel::from_shared(node.grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
{
Expand Down
2 changes: 2 additions & 0 deletions src/service/file_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub async fn query(
.map_err(|_| Error::Message("invalid token".to_string()))?;
let channel = Channel::from_shared(node.grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -198,6 +199,7 @@ pub async fn query(
.map_err(|_| Error::Message("invalid token".to_string()))?;
let channel = Channel::from_shared(node.grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down
2 changes: 2 additions & 0 deletions src/service/metrics/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ pub(crate) async fn get_series(
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down Expand Up @@ -759,6 +760,7 @@ pub(crate) async fn get_label_values(
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/service/promql/search/grpc/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async fn get_file_list(
.map_err(|_| DataFusionError::Execution("invalid token".to_string()))?;
let channel = Channel::from_shared(node_addr)
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|_| {
Expand Down
1 change: 1 addition & 0 deletions src/service/promql/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ async fn search_in_cluster(
.map_err(|_| Error::Message("invalid token".to_string()))?;
let channel = Channel::from_shared(node_addr)
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down
3 changes: 2 additions & 1 deletion src/service/search/cluster/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<cluster_rpc::
);

// handle query function
let (merge_results, scan_stats, inverted_index_count, _) =
let (merge_results, scan_stats, inverted_index_count, _, is_partial) =
super::search(&trace_id, sql.clone(), req).await?;

// final result
Expand Down Expand Up @@ -115,6 +115,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<cluster_rpc::
hits: hits_buf,
aggs: aggs_buf,
scan_stats: Some(cluster_rpc::ScanStats::from(&scan_stats)),
is_partial,
};

Ok(result)
Expand Down
3 changes: 2 additions & 1 deletion src/service/search/cluster/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<search::Respo
req.query.as_mut().unwrap().query_fn = "".to_string();

// handle query function
let (merge_batches, scan_stats, inverted_index_count, took_wait) =
let (merge_batches, scan_stats, inverted_index_count, took_wait, is_partial) =
super::search(&trace_id, sql.clone(), req).await?;

// final result
Expand Down Expand Up @@ -185,6 +185,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<search::Respo
} else {
result.set_total(total);
}
result.set_partial(is_partial);
result.set_cluster_took(start.elapsed().as_millis() as usize, took_wait);
result.set_file_count(scan_stats.files as usize);
result.set_scan_size(scan_stats.original_size as usize);
Expand Down
Loading

0 comments on commit 32b7ce4

Please sign in to comment.