Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve super cluster #3428

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
540 changes: 328 additions & 212 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ codegen-units = 4

[dependencies]
actix-cors = "0.6"
actix-http = "3.6"
actix-multipart = { version = "0.6", features = ["derive"] }
actix-web.workspace = true
actix-web-httpauth = "0.8"
Expand Down Expand Up @@ -247,7 +248,7 @@ rand = "0.8"
rayon = "1.7.0"
regex = "1.7"
regex-syntax = "0.8"
reqwest = { version = "0.11", default-features = false, features = [
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"stream",
] }
Expand Down
2 changes: 2 additions & 0 deletions src/cli/data/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ impl Context for Export {
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
};

let req = search::Request {
query,
aggs: HashMap::new(),
encoding: search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
5 changes: 3 additions & 2 deletions src/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use lettre::{
AsyncSmtpTransport, Tokio1Executor,
};
use once_cell::sync::Lazy;
use reqwest::Client;
use sysinfo::{DiskExt, SystemExt};

use crate::{
Expand Down Expand Up @@ -165,7 +164,7 @@ pub static INSTANCE_ID: Lazy<RwHashMap<String, String>> = Lazy::new(Default::def

pub static TELEMETRY_CLIENT: Lazy<segment::HttpClient> = Lazy::new(|| {
segment::HttpClient::new(
Client::builder()
reqwest::Client::builder()
.connect_timeout(Duration::new(10, 0))
.build()
.unwrap(),
Expand Down Expand Up @@ -420,6 +419,8 @@ pub struct Grpc {
help = "Max grpc message size in MB, default is 16 MB"
)]
pub max_message_size: usize,
#[env_config(name = "ZO_GRPC_CONNECT_TIMEOUT", default = 5)] // in seconds
pub connect_timeout: u64,
}

#[derive(EnvConfig)]
Expand Down
14 changes: 14 additions & 0 deletions src/config/src/meta/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct Request {
#[serde(default)]
pub encoding: RequestEncoding,
#[serde(default)]
pub regions: Vec<String>, // default query all regions, local: only query local region clusters
#[serde(default)]
pub clusters: Vec<String>, // default query all clusters, local: only query local cluster
#[serde(default)]
pub timeout: i64,
Expand Down Expand Up @@ -117,6 +119,8 @@ pub struct Query {
pub query_fn: Option<String>,
#[serde(default)]
pub skip_wal: bool,
#[serde(default)]
pub is_partial: bool,
}

fn default_size() -> usize {
Expand All @@ -140,6 +144,7 @@ impl Default for Query {
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
}
}
}
Expand Down Expand Up @@ -203,6 +208,8 @@ pub struct Response {
pub trace_id: String,
#[serde(skip_serializing_if = "String::is_empty")]
pub function_error: String,
#[serde(default)]
pub is_partial: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default, ToSchema)]
Expand Down Expand Up @@ -240,6 +247,7 @@ impl Response {
response_type: "".to_string(),
trace_id: "".to_string(),
function_error: "".to_string(),
is_partial: false,
}
}

Expand Down Expand Up @@ -296,6 +304,10 @@ impl Response {
pub fn set_trace_id(&mut self, trace_id: String) {
self.trace_id = trace_id;
}

pub fn set_partial(&mut self, is_partial: bool) {
self.is_partial = is_partial;
}
}

#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
Expand Down Expand Up @@ -538,9 +550,11 @@ mod tests {
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: "base64".into(),
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
8 changes: 8 additions & 0 deletions src/handler/http/request/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,11 @@ pub async fn around(
uses_zo_fn: uses_fn,
query_fn: query_fn.clone(),
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down Expand Up @@ -565,9 +567,11 @@ pub async fn around(
uses_zo_fn: uses_fn,
query_fn: query_fn.clone(),
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down Expand Up @@ -937,9 +941,11 @@ async fn values_v1(
uses_zo_fn: uses_fn,
query_fn: query_fn.clone(),
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down Expand Up @@ -1165,9 +1171,11 @@ async fn values_v2(
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down
2 changes: 2 additions & 0 deletions src/handler/http/request/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,11 @@ pub async fn get_latest_traces(
uses_zo_fn: false,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout,
};
Expand Down
6 changes: 3 additions & 3 deletions src/handler/http/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ async fn proxy(
path: web::Path<PathParamProxyURL>,
req: HttpRequest,
) -> actix_web::Result<HttpResponse> {
let client = reqwest::Client::new();
let client = awc::Client::new();
let method = req.method().clone();
let forwarded_resp = client
let mut forwarded_resp = client
.request(method, &path.target_url)
.send()
.await
Expand All @@ -187,7 +187,7 @@ async fn proxy(
})?;

let status = forwarded_resp.status().as_u16();
let body = forwarded_resp.bytes().await.map_err(|e| {
let body = forwarded_resp.body().await.map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Failed to read the response: {}", e))
})?;

Expand Down
1 change: 1 addition & 0 deletions src/proto/proto/cluster/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message SearchResponse {
bytes hits = 6;
repeated SearchAggResponse aggs = 7;
ScanStats scan_stats = 8;
bool is_partial = 9;
}

message SearchAggRequest {
Expand Down
1 change: 1 addition & 0 deletions src/router/grpc/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) async fn get_ingester_channel() -> Result<Channel, tonic::Status> {
// cache miss, connect to ingester
let channel = Channel::from_shared(grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down
2 changes: 2 additions & 0 deletions src/service/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,11 @@ impl QueryCondition {
query_context: None,
query_fn: None,
skip_wal: false,
is_partial: false,
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/service/db/enrichment_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub async fn get(org_id: &str, name: &str) -> Result<Vec<vrl::value::Value>, any
query,
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/service/db/file_list/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ async fn send_to_node(
.expect("parse internal grpc token faile");
let channel = match Channel::from_shared(node.grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
{
Expand Down
2 changes: 2 additions & 0 deletions src/service/file_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub async fn query(
.map_err(|_| Error::Message("invalid token".to_string()))?;
let channel = Channel::from_shared(node.grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -198,6 +199,7 @@ pub async fn query(
.map_err(|_| Error::Message("invalid token".to_string()))?;
let channel = Channel::from_shared(node.grpc_addr.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down
2 changes: 2 additions & 0 deletions src/service/metrics/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ pub(crate) async fn get_series(
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down Expand Up @@ -759,6 +760,7 @@ pub(crate) async fn get_label_values(
},
aggs: HashMap::new(),
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/service/promql/search/grpc/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async fn get_file_list(
.map_err(|_| DataFusionError::Execution("invalid token".to_string()))?;
let channel = Channel::from_shared(node_addr)
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|_| {
Expand Down
1 change: 1 addition & 0 deletions src/service/promql/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ async fn search_in_cluster(
.map_err(|_| Error::Message("invalid token".to_string()))?;
let channel = Channel::from_shared(node_addr)
.unwrap()
.connect_timeout(std::time::Duration::from_secs(CONFIG.grpc.connect_timeout))
.connect()
.await
.map_err(|err| {
Expand Down
3 changes: 2 additions & 1 deletion src/service/search/cluster/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<cluster_rpc::
);

// handle query function
let (merge_results, scan_stats, inverted_index_count, _) =
let (merge_results, scan_stats, inverted_index_count, _, is_partial) =
super::search(&trace_id, sql.clone(), req).await?;

// final result
Expand Down Expand Up @@ -115,6 +115,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<cluster_rpc::
hits: hits_buf,
aggs: aggs_buf,
scan_stats: Some(cluster_rpc::ScanStats::from(&scan_stats)),
is_partial,
};

Ok(result)
Expand Down
3 changes: 2 additions & 1 deletion src/service/search/cluster/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<search::Respo
req.query.as_mut().unwrap().query_fn = "".to_string();

// handle query function
let (merge_batches, scan_stats, inverted_index_count, took_wait) =
let (merge_batches, scan_stats, inverted_index_count, took_wait, is_partial) =
super::search(&trace_id, sql.clone(), req).await?;

// final result
Expand Down Expand Up @@ -185,6 +185,7 @@ pub async fn search(mut req: cluster_rpc::SearchRequest) -> Result<search::Respo
} else {
result.set_total(total);
}
result.set_partial(is_partial);
result.set_cluster_took(start.elapsed().as_millis() as usize, took_wait);
result.set_file_count(scan_stats.files as usize);
result.set_scan_size(scan_stats.original_size as usize);
Expand Down
Loading
Loading