Skip to content

Commit

Permalink
Merge #4536
Browse files Browse the repository at this point in the history
4536: Limit concurrent search requests r=ManyTheFish a=irevoire

# Pull Request

## Related issue
Fixes #4489

## What does this PR do?
- Adds a « search queue » that limits the number of search requests we can process at the same time and stores search requests to be processed
- Process only one search request per core/thread (we use available_parallelism)
- When the search queue is full, new search requests replace old ones **randomly**. The reason is that:
  - If we serve the oldest one first, like Typesense, we give the worst performances to everyone
  - If we serve the latest one, it gets too easy to DoS us (you just need to fill the queue with as many search requests as we can process simultaneously to ensure no other request will ever be processed)
  - By picking the search request randomly, we give a chance to recent search requests to be processed while ensuring that we can't be owned unless they fill our queue entirely and we start returning errors 5xx
- Adds an experimental parameter to control the size of the queue
- Adds a bunch of tests to ensure the search queue works correctly
- Ensure the loop consuming the search queue is running in the health route and crashes if it’s not the case

Co-authored-by: Tamo <tamo@meilisearch.com>
  • Loading branch information
meili-bors[bot] and irevoire committed Mar 28, 2024
2 parents 781e2d7 + 06a11b5 commit fa9748c
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 2 deletions.
11 changes: 10 additions & 1 deletion meilisearch-types/src/error.rs
Expand Up @@ -2,6 +2,7 @@ use std::{fmt, io};

use actix_web::http::StatusCode;
use actix_web::{self as aweb, HttpResponseBuilder};
use aweb::http::header;
use aweb::rt::task::JoinError;
use convert_case::Casing;
use milli::heed::{Error as HeedError, MdbError};
Expand Down Expand Up @@ -56,7 +57,14 @@ where
impl aweb::error::ResponseError for ResponseError {
fn error_response(&self) -> aweb::HttpResponse {
let json = serde_json::to_vec(self).unwrap();
HttpResponseBuilder::new(self.status_code()).content_type("application/json").body(json)
let mut builder = HttpResponseBuilder::new(self.status_code());
builder.content_type("application/json");

if self.code == StatusCode::SERVICE_UNAVAILABLE {
builder.insert_header((header::RETRY_AFTER, "10"));
}

builder.body(json)
}

fn status_code(&self) -> StatusCode {
Expand Down Expand Up @@ -305,6 +313,7 @@ MissingSwapIndexes , InvalidRequest , BAD_REQUEST ;
MissingTaskFilters , InvalidRequest , BAD_REQUEST ;
NoSpaceLeftOnDevice , System , UNPROCESSABLE_ENTITY;
PayloadTooLarge , InvalidRequest , PAYLOAD_TOO_LARGE ;
TooManySearchRequests , System , SERVICE_UNAVAILABLE ;
TaskNotFound , InvalidRequest , NOT_FOUND ;
TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ;
TooManyVectors , InvalidRequest , BAD_REQUEST ;
Expand Down
3 changes: 3 additions & 0 deletions meilisearch/src/analytics/segment_analytics.rs
Expand Up @@ -252,6 +252,7 @@ impl super::Analytics for SegmentAnalytics {
struct Infos {
env: String,
experimental_enable_metrics: bool,
experimental_search_queue_size: usize,
experimental_logs_mode: LogMode,
experimental_replication_parameters: bool,
experimental_enable_logs_route: bool,
Expand Down Expand Up @@ -293,6 +294,7 @@ impl From<Opt> for Infos {
let Opt {
db_path,
experimental_enable_metrics,
experimental_search_queue_size,
experimental_logs_mode,
experimental_replication_parameters,
experimental_enable_logs_route,
Expand Down Expand Up @@ -342,6 +344,7 @@ impl From<Opt> for Infos {
Self {
env,
experimental_enable_metrics,
experimental_search_queue_size,
experimental_logs_mode,
experimental_replication_parameters,
experimental_enable_logs_route,
Expand Down
6 changes: 6 additions & 0 deletions meilisearch/src/error.rs
Expand Up @@ -29,6 +29,10 @@ pub enum MeilisearchHttpError {
InvalidExpression(&'static [&'static str], Value),
#[error("A {0} payload is missing.")]
MissingPayload(PayloadType),
#[error("Too many search requests running at the same time: {0}. Retry after 10s.")]
TooManySearchRequests(usize),
#[error("Internal error: Search limiter is down.")]
SearchLimiterIsDown,
#[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))]
PayloadTooLarge(usize),
#[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.",
Expand Down Expand Up @@ -69,6 +73,8 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter,
MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter,
MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge,
MeilisearchHttpError::TooManySearchRequests(_) => Code::TooManySearchRequests,
MeilisearchHttpError::SearchLimiterIsDown => Code::Internal,
MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes,
MeilisearchHttpError::IndexUid(e) => e.error_code(),
MeilisearchHttpError::SerdeJson(_) => Code::Internal,
Expand Down
10 changes: 9 additions & 1 deletion meilisearch/src/lib.rs
Expand Up @@ -9,12 +9,14 @@ pub mod middleware;
pub mod option;
pub mod routes;
pub mod search;
pub mod search_queue;

use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::thread;
use std::thread::{self, available_parallelism};
use std::time::Duration;

use actix_cors::Cors;
Expand All @@ -38,6 +40,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file};
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
pub use option::Opt;
use option::ScheduleSnapshot;
use search_queue::SearchQueue;
use tracing::{error, info_span};
use tracing_subscriber::filter::Targets;

Expand Down Expand Up @@ -469,10 +472,15 @@ pub fn configure_data(
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
) {
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
config
.app_data(index_scheduler)
.app_data(auth)
.app_data(web::Data::new(search_queue))
.app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr))
Expand Down
15 changes: 15 additions & 0 deletions meilisearch/src/option.rs
Expand Up @@ -54,6 +54,7 @@ const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE";
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
Expand Down Expand Up @@ -344,6 +345,15 @@ pub struct Opt {
#[serde(default)]
pub experimental_enable_metrics: bool,

/// Experimental search queue size. For more information, see: <https://github.com/orgs/meilisearch/discussions/729>
///
/// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the
/// queue is full it starts returning HTTP 503, Service Unavailable.
/// The default value is 1000.
#[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = 1000)]
#[serde(default)]
pub experimental_search_queue_size: usize,

/// Experimental logs mode feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/723>
///
/// Change the mode of the logs on the console.
Expand Down Expand Up @@ -473,6 +483,7 @@ impl Opt {
#[cfg(feature = "analytics")]
no_analytics,
experimental_enable_metrics,
experimental_search_queue_size,
experimental_logs_mode,
experimental_enable_logs_route,
experimental_replication_parameters,
Expand Down Expand Up @@ -532,6 +543,10 @@ impl Opt {
MEILI_EXPERIMENTAL_ENABLE_METRICS,
experimental_enable_metrics.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE,
experimental_search_queue_size.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LOGS_MODE,
experimental_logs_mode.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions meilisearch/src/routes/indexes/facet_search.rs
Expand Up @@ -17,6 +17,7 @@ use crate::search::{
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET,
};
use crate::search_queue::SearchQueue;

pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(search)));
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct FacetSearchQuery {

pub async fn search(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebJson<FacetSearchQuery, DeserrJsonError>,
req: HttpRequest,
Expand All @@ -71,6 +73,7 @@ pub async fn search(

let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features();
let _permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search(&index, search_query, facet_query, facet_name, features)
})
Expand Down
5 changes: 5 additions & 0 deletions meilisearch/src/routes/indexes/search.rs
Expand Up @@ -23,6 +23,7 @@ use crate::search::{
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO,
};
use crate::search_queue::SearchQueue;

pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
Expand Down Expand Up @@ -182,6 +183,7 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec<String> {

pub async fn search_with_url_query(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebQueryParameter<SearchQueryGet, DeserrQueryParamError>,
req: HttpRequest,
Expand All @@ -204,6 +206,7 @@ pub async fn search_with_url_query(

let distribution = embed(&mut query, index_scheduler.get_ref(), &index)?;

let _permit = search_queue.try_get_search_permit().await?;
let search_result =
tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution))
.await?;
Expand All @@ -220,6 +223,7 @@ pub async fn search_with_url_query(

pub async fn search_with_post(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebJson<SearchQuery, DeserrJsonError>,
req: HttpRequest,
Expand All @@ -243,6 +247,7 @@ pub async fn search_with_post(

let distribution = embed(&mut query, index_scheduler.get_ref(), &index)?;

let _permit = search_queue.try_get_search_permit().await?;
let search_result =
tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution))
.await?;
Expand Down
3 changes: 3 additions & 0 deletions meilisearch/src/routes/mod.rs
Expand Up @@ -15,6 +15,7 @@ use tracing::debug;
use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::search_queue::SearchQueue;
use crate::Opt;

const PAGINATION_DEFAULT_LIMIT: usize = 20;
Expand Down Expand Up @@ -385,10 +386,12 @@ pub async fn get_health(
req: HttpRequest,
index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>,
search_queue: Data<SearchQueue>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.health_seen(&req);

search_queue.health().unwrap();
index_scheduler.health().unwrap();
auth_controller.health().unwrap();

Expand Down
6 changes: 6 additions & 0 deletions meilisearch/src/routes/multi_search.rs
Expand Up @@ -17,6 +17,7 @@ use crate::routes::indexes::search::embed;
use crate::search::{
add_search_rules, perform_search, SearchQueryWithIndex, SearchResultWithIndex,
};
use crate::search_queue::SearchQueue;

pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post))));
Expand All @@ -35,6 +36,7 @@ pub struct SearchQueries {

pub async fn multi_search_with_post(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: Data<SearchQueue>,
params: AwebJson<SearchQueries, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
Expand All @@ -44,6 +46,10 @@ pub async fn multi_search_with_post(
let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req);
let features = index_scheduler.features();

// Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request.
let _permit = search_queue.try_get_search_permit().await?;

// Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only,
// so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code
// changes.
Expand Down

0 comments on commit fa9748c

Please sign in to comment.