Skip to content

Commit

Permalink
Return 404 on index not found in ES Bulk API (#4425)
Browse files Browse the repository at this point in the history
* Return 404 on index not found in ES Bulk API

* Rename `elastic_search` -> `elasticsearch`

* Rename `ElasticSearch` -> `Elasticsearch`
  • Loading branch information
guilload committed Jan 18, 2024
1 parent 0a40dba commit 6698bf4
Show file tree
Hide file tree
Showing 18 changed files with 143 additions and 107 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-query/src/query_ast/bool_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::InvalidQuery;
/// Edge cases of BooleanQuery are not obvious,
/// and different beahvior could be justified.
///
/// Here we aligne ourselves with ElasticSearch.
/// Here we aligne ourselves with Elasticsearch.
/// A boolean query is to be interpreted like a filtering predicate
/// over the set of documents.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use quickwit_proto::types::IndexId;
use warp::{Filter, Rejection};

use super::bulk_v2::{elastic_bulk_ingest_v2, ElasticBulkResponse};
use crate::elastic_search_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter};
use crate::elastic_search_api::make_elastic_api_response;
use crate::elastic_search_api::model::{BulkAction, ElasticBulkOptions, ElasticSearchError};
use crate::elasticsearch_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter};
use crate::elasticsearch_api::make_elastic_api_response;
use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError};
use crate::format::extract_format_from_qs;
use crate::ingest_api::lines;
use crate::with_arg;
Expand Down Expand Up @@ -82,7 +82,7 @@ async fn elastic_ingest_bulk(
bulk_options: ElasticBulkOptions,
mut ingest_service: IngestServiceClient,
ingest_router: IngestRouterServiceClient,
) -> Result<ElasticBulkResponse, ElasticSearchError> {
) -> Result<ElasticBulkResponse, ElasticsearchError> {
if enable_ingest_v2() {
return elastic_bulk_ingest_v2(default_index_id, body, bulk_options, ingest_router).await;
}
Expand All @@ -92,13 +92,13 @@ async fn elastic_ingest_bulk(

while let Some((line_number, line)) = lines.next() {
let action = serde_json::from_slice::<BulkAction>(line).map_err(|error| {
ElasticSearchError::new(
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
format!("Malformed action/metadata line [#{line_number}]. Details: `{error}`"),
)
})?;
let (_, source) = lines.next().ok_or_else(|| {
ElasticSearchError::new(
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
"expected source for the action".to_string(),
)
Expand All @@ -110,7 +110,7 @@ async fn elastic_ingest_bulk(
.into_index_id()
.or_else(|| default_index_id.clone())
.ok_or_else(|| {
ElasticSearchError::new(
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
format!("missing required field: `_index` in the line [#{line_number}]."),
)
Expand All @@ -131,6 +131,7 @@ async fn elastic_ingest_bulk(
commit: commit_type.into(),
};
ingest_service.ingest(ingest_request).await?;

let took_millis = now.elapsed().as_millis() as u64;
let errors = false;
let bulk_response = ElasticBulkResponse {
Expand All @@ -151,9 +152,9 @@ mod tests {
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_search::MockSearchService;

use crate::elastic_search_api::bulk_v2::ElasticBulkResponse;
use crate::elastic_search_api::elastic_api_handlers;
use crate::elastic_search_api::model::ElasticSearchError;
use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse;
use crate::elasticsearch_api::elastic_api_handlers;
use crate::elasticsearch_api::model::ElasticsearchError;
use crate::ingest_api::setup_ingest_service;

#[tokio::test]
Expand Down Expand Up @@ -436,7 +437,7 @@ mod tests {
.reply(&elastic_api_handlers)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(es_error.status, StatusCode::BAD_REQUEST);
assert_eq!(
es_error.error.reason.unwrap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ use bytes::Bytes;
use hyper::StatusCode;
use quickwit_config::INGEST_V2_SOURCE_ID;
use quickwit_ingest::IngestRequestV2Builder;
use quickwit_proto::ingest::router::{IngestRouterService, IngestRouterServiceClient};
use quickwit_proto::ingest::router::{
IngestFailureReason, IngestRouterService, IngestRouterServiceClient,
};
use quickwit_proto::ingest::CommitTypeV2;
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
use tracing::warn;

use crate::elastic_search_api::model::{BulkAction, ElasticBulkOptions, ElasticSearchError};
use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError};
use crate::ingest_api::lines;

#[derive(Debug, Default, Serialize, Deserialize)]
Expand All @@ -44,20 +46,20 @@ pub(crate) async fn elastic_bulk_ingest_v2(
body: Bytes,
bulk_options: ElasticBulkOptions,
mut ingest_router: IngestRouterServiceClient,
) -> Result<ElasticBulkResponse, ElasticSearchError> {
) -> Result<ElasticBulkResponse, ElasticsearchError> {
let now = Instant::now();
let mut ingest_request_builder = IngestRequestV2Builder::default();
let mut lines = lines(&body).enumerate();

while let Some((line_no, line)) = lines.next() {
let action = serde_json::from_slice::<BulkAction>(line).map_err(|error| {
ElasticSearchError::new(
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
format!("unsupported or malformed action on line #{line_no}: `{error}`"),
)
})?;
let (_, source) = lines.next().ok_or_else(|| {
ElasticSearchError::new(
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
format!("associated source data with action on line #{line_no} is missing"),
)
Expand All @@ -70,7 +72,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
.into_index_id()
.or_else(|| default_index_id.clone())
.ok_or_else(|| {
ElasticSearchError::new(
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
format!("`_index` field of action on line #{line_no} is missing"),
)
Expand All @@ -88,6 +90,15 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let ingest_response_v2 = ingest_router.ingest(ingest_request).await?;
let took_millis = now.elapsed().as_millis() as u64;
let errors = !ingest_response_v2.failures.is_empty();

for failure in ingest_response_v2.failures {
// This custom logic for Airmail is temporary.
if failure.reason() == IngestFailureReason::IndexNotFound {
let reason = format!("index `{}` not found", failure.index_id);
let elasticsearch_error = ElasticsearchError::new(StatusCode::NOT_FOUND, reason);
return Err(elasticsearch_error);
}
}
let bulk_response = ElasticBulkResponse {
took_millis,
errors,
Expand All @@ -107,10 +118,10 @@ mod tests {
use warp::{Filter, Rejection, Reply};

use super::*;
use crate::elastic_search_api::bulk_v2::ElasticBulkResponse;
use crate::elastic_search_api::filter::elastic_bulk_filter;
use crate::elastic_search_api::make_elastic_api_response;
use crate::elastic_search_api::model::ElasticSearchError;
use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse;
use crate::elasticsearch_api::filter::elastic_bulk_filter;
use crate::elasticsearch_api::make_elastic_api_response;
use crate::elasticsearch_api::model::ElasticsearchError;
use crate::format::extract_format_from_qs;
use crate::with_arg;

Expand All @@ -133,13 +144,12 @@ mod tests {
.expect_ingest()
.once()
.returning(|ingest_request| {
assert_eq!(ingest_request.subrequests.len(), 3);
assert_eq!(ingest_request.subrequests.len(), 2);
assert_eq!(ingest_request.commit_type(), CommitTypeV2::Auto);

let mut subrequests = ingest_request.subrequests;
assert_eq!(subrequests[0].subrequest_id, 0);
assert_eq!(subrequests[1].subrequest_id, 1);
assert_eq!(subrequests[2].subrequest_id, 2);

subrequests.sort_by(|left, right| left.index_id.cmp(&right.index_id));

Expand All @@ -153,11 +163,6 @@ mod tests {
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48);

assert_eq!(subrequests[2].index_id, "my-index-3");
assert_eq!(subrequests[2].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequests[2].doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequests[2].doc_batch.as_ref().unwrap().num_bytes(), 48);

Ok(IngestResponseV2 {
successes: vec![
IngestSuccess {
Expand All @@ -175,12 +180,7 @@ mod tests {
replication_position_inclusive: Some(Position::offset(0u64)),
},
],
failures: vec![IngestFailure {
subrequest_id: 2,
index_id: "my-index-3".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
reason: IngestFailureReason::IndexNotFound as i32,
}],
failures: Vec::new(),
})
});
let ingest_router = IngestRouterServiceClient::from(ingest_router_mock);
Expand All @@ -193,8 +193,6 @@ mod tests {
{"ts": 1, "message": "my-message-1"}
{"create": {"_index": "my-index-1"}}
{"ts": 2, "message": "my-message-2"}
{"create": {"_index": "my-index-3"}}
{"ts": 1, "message": "my-message-1"}
"#;
let response = warp::test::request()
.path("/_elastic/_bulk")
Expand All @@ -205,7 +203,7 @@ mod tests {
assert_eq!(response.status(), 200);

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(bulk_response.errors);
assert!(!bulk_response.errors);
}

#[tokio::test]
Expand Down Expand Up @@ -291,7 +289,7 @@ mod tests {
.await;
assert_eq!(response.status(), 400);

let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(response.body()).unwrap();
assert_eq!(es_error.status, StatusCode::BAD_REQUEST);

let reason = es_error.error.reason.unwrap();
Expand All @@ -311,7 +309,7 @@ mod tests {
.await;
assert_eq!(response.status(), 400);

let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(response.body()).unwrap();
assert_eq!(es_error.status, StatusCode::BAD_REQUEST);

let reason = es_error.error.reason.unwrap();
Expand All @@ -332,10 +330,47 @@ mod tests {
.await;
assert_eq!(response.status(), 400);

let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(response.body()).unwrap();
assert_eq!(es_error.status, StatusCode::BAD_REQUEST);

let reason = es_error.error.reason.unwrap();
assert_eq!(reason, "`_index` field of action on line #0 is missing");
}

// Airmail-specific test. It should go away when we straighten out the API response.
#[tokio::test]
async fn test_bulk_api_returns_404_on_index_not_found() {
let mut ingest_router_mock = IngestRouterServiceClient::mock();
ingest_router_mock.expect_ingest().once().returning(|_| {
Ok(IngestResponseV2 {
successes: Vec::new(),
failures: vec![IngestFailure {
subrequest_id: 2,
index_id: "my-index".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
reason: IngestFailureReason::IndexNotFound as i32,
}],
})
});
let ingest_router = IngestRouterServiceClient::from(ingest_router_mock);
let handler = es_compat_bulk_handler_v2(ingest_router);

let payload = r#"
{"create": {"_index": "my-index", "_id" : "1"}}
{"ts": 1, "message": "my-message"}
"#;
let response = warp::test::request()
.path("/_elastic/_bulk")
.method("POST")
.body(payload)
.reply(&handler)
.await;
assert_eq!(response.status(), 404);

let es_error: ElasticsearchError = serde_json::from_slice(response.body()).unwrap();
assert_eq!(es_error.status, StatusCode::NOT_FOUND);

let reason = es_error.error.reason.unwrap();
assert_eq!(reason, "index `my-index` not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::model::{
FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams,
SearchQueryParamsCount,
};
use crate::elastic_search_api::model::{
use crate::elasticsearch_api::model::{
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
};
use crate::search_api::{extract_index_id_patterns, extract_index_id_patterns_default};
Expand All @@ -50,7 +50,7 @@ pub(crate) fn elastic_cluster_info_filter() -> impl Filter<Extract = (), Error =
}

#[utoipa::path(get, tag = "Search", path = "/_search")]
pub(crate) fn elastic_search_filter(
pub(crate) fn elasticsearch_filter(
) -> impl Filter<Extract = (SearchQueryParams,), Error = Rejection> + Clone {
warp::path!("_elastic" / "_search")
.and(warp::get().or(warp::post()).unify())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use warp::{Filter, Rejection};
use self::rest_handler::{
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
};
use crate::elastic_search_api::model::ElasticSearchError;
use crate::elasticsearch_api::model::ElasticsearchError;
use crate::json_api_response::JsonApiResponse;
use crate::{BodyFormat, BuildInfo};

Expand Down Expand Up @@ -103,7 +103,7 @@ impl From<i64> for TrackTotalHits {
}

fn make_elastic_api_response<T: serde::Serialize>(
elasticsearch_result: Result<T, ElasticSearchError>,
elasticsearch_result: Result<T, ElasticsearchError>,
format: BodyFormat,
) -> JsonApiResponse {
let status_code = match &elasticsearch_result {
Expand All @@ -128,9 +128,9 @@ mod tests {
use warp::Filter;

use super::elastic_api_handlers;
use super::model::ElasticSearchError;
use crate::elastic_search_api::model::MultiSearchResponse;
use crate::elastic_search_api::rest_handler::es_compat_cluster_info_handler;
use super::model::ElasticsearchError;
use crate::elasticsearch_api::model::MultiSearchResponse;
use crate::elasticsearch_api::rest_handler::es_compat_cluster_info_handler;
use crate::rest::recover_fn;
use crate::BuildInfo;

Expand Down Expand Up @@ -263,7 +263,7 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
Expand Down Expand Up @@ -294,7 +294,7 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
Expand Down Expand Up @@ -324,7 +324,7 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
let es_error: ElasticsearchError = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(
es_error.error.reason.unwrap(),
"Invalid argument: `_msearch` request header must define at least one index"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ pub struct BulkActionMeta {

#[cfg(test)]
mod tests {
use crate::elastic_search_api::model::bulk_body::BulkActionMeta;
use crate::elastic_search_api::model::BulkAction;
use crate::elasticsearch_api::model::bulk_body::BulkActionMeta;
use crate::elasticsearch_api::model::BulkAction;

#[test]
fn test_bulk_action_serde() {
Expand Down
Loading

0 comments on commit 6698bf4

Please sign in to comment.