Skip to content

Commit

Permalink
concurrency limit + load shed + timeout at tower level
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 11, 2024
1 parent de266a8 commit 671f5ce
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ tower = { version = "0.4.13", features = [
"load",
"retry",
"util",
"load-shed",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tracing = "0.1.37"
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::time::Instant;
use std::time::{Duration, Instant};

use hyper::StatusCode;
use quickwit_config::INGEST_V2_SOURCE_ID;
Expand Down Expand Up @@ -130,7 +130,9 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let Some(ingest_request) = ingest_request_opt else {
return Ok(ElasticBulkResponse::default());
};
let ingest_response_v2 = ingest_router.ingest(ingest_request).await?;
let ingest_response_v2 =
tokio::time::timeout(Duration::from_millis(500), ingest_router.ingest(ingest_request)).await
.map_err(|err| ElasticsearchError::new(StatusCode::REQUEST_TIMEOUT, "router timeout".to_string(), None))??;
let errors = !ingest_response_v2.failures.is_empty();
let mut items = Vec::new();

Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use hyper::http::HeaderValue;
use hyper::{http, Method, StatusCode};
Expand Down Expand Up @@ -132,6 +133,12 @@ pub(crate) async fn start_rest_server(
let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins);

let service = ServiceBuilder::new()
.timeout(Duration::from_millis(500)) // TO NOT MERGE THIS, THIS IS JUST FOR A TEST.
.concurrency_limit(quickwit_common::get_from_env("QW_REST_CONCURRENCY_LIMIT", 5))
.load_shed()
.concurrency_limit(quickwit_common::get_from_env("QW_REST_LOAD_SHED_LIMIT", 30))
.load_shed()
.timeout(Duration::from_secs(1))
.layer(
CompressionLayer::new()
.gzip(true)
Expand Down

0 comments on commit 671f5ce

Please sign in to comment.