Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return 404 on index not found in ES Bulk API #4425

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async fn elastic_ingest_bulk(
commit: commit_type.into(),
};
ingest_service.ingest(ingest_request).await?;

let took_millis = now.elapsed().as_millis() as u64;
let errors = false;
let bulk_response = ElasticBulkResponse {
Expand Down
69 changes: 52 additions & 17 deletions quickwit/quickwit-serve/src/elastic_search_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use bytes::Bytes;
use hyper::StatusCode;
use quickwit_config::INGEST_V2_SOURCE_ID;
use quickwit_ingest::IngestRequestV2Builder;
use quickwit_proto::ingest::router::{IngestRouterService, IngestRouterServiceClient};
use quickwit_proto::ingest::router::{
IngestFailureReason, IngestRouterService, IngestRouterServiceClient,
};
use quickwit_proto::ingest::CommitTypeV2;
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -88,6 +90,15 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let ingest_response_v2 = ingest_router.ingest(ingest_request).await?;
let took_millis = now.elapsed().as_millis() as u64;
let errors = !ingest_response_v2.failures.is_empty();

for failure in ingest_response_v2.failures {
// This custom logic for Airmail is temporary.
if failure.reason() == IngestFailureReason::IndexNotFound {
let reason = format!("index `{}` not found", failure.index_id);
let elasticsearch_error = ElasticSearchError::new(StatusCode::NOT_FOUND, reason);
return Err(elasticsearch_error);
}
}
let bulk_response = ElasticBulkResponse {
took_millis,
errors,
Expand Down Expand Up @@ -133,13 +144,12 @@ mod tests {
.expect_ingest()
.once()
.returning(|ingest_request| {
assert_eq!(ingest_request.subrequests.len(), 3);
assert_eq!(ingest_request.subrequests.len(), 2);
assert_eq!(ingest_request.commit_type(), CommitTypeV2::Auto);

let mut subrequests = ingest_request.subrequests;
assert_eq!(subrequests[0].subrequest_id, 0);
assert_eq!(subrequests[1].subrequest_id, 1);
assert_eq!(subrequests[2].subrequest_id, 2);

subrequests.sort_by(|left, right| left.index_id.cmp(&right.index_id));

Expand All @@ -153,11 +163,6 @@ mod tests {
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48);

assert_eq!(subrequests[2].index_id, "my-index-3");
assert_eq!(subrequests[2].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequests[2].doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequests[2].doc_batch.as_ref().unwrap().num_bytes(), 48);

Ok(IngestResponseV2 {
successes: vec![
IngestSuccess {
Expand All @@ -175,12 +180,7 @@ mod tests {
replication_position_inclusive: Some(Position::offset(0u64)),
},
],
failures: vec![IngestFailure {
subrequest_id: 2,
index_id: "my-index-3".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
reason: IngestFailureReason::IndexNotFound as i32,
}],
failures: Vec::new(),
})
});
let ingest_router = IngestRouterServiceClient::from(ingest_router_mock);
Expand All @@ -193,8 +193,6 @@ mod tests {
{"ts": 1, "message": "my-message-1"}
{"create": {"_index": "my-index-1"}}
{"ts": 2, "message": "my-message-2"}
{"create": {"_index": "my-index-3"}}
{"ts": 1, "message": "my-message-1"}
"#;
let response = warp::test::request()
.path("/_elastic/_bulk")
Expand All @@ -205,7 +203,7 @@ mod tests {
assert_eq!(response.status(), 200);

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(bulk_response.errors);
assert!(!bulk_response.errors);
}

#[tokio::test]
Expand Down Expand Up @@ -338,4 +336,41 @@ mod tests {
let reason = es_error.error.reason.unwrap();
assert_eq!(reason, "`_index` field of action on line #0 is missing");
}

// Airmail-specific test. It should go away when we straighten out the API response.
#[tokio::test]
async fn test_bulk_api_returns_404_on_index_not_found() {
let mut ingest_router_mock = IngestRouterServiceClient::mock();
ingest_router_mock.expect_ingest().once().returning(|_| {
Ok(IngestResponseV2 {
successes: Vec::new(),
failures: vec![IngestFailure {
subrequest_id: 2,
index_id: "my-index".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
reason: IngestFailureReason::IndexNotFound as i32,
}],
})
});
let ingest_router = IngestRouterServiceClient::from(ingest_router_mock);
let handler = es_compat_bulk_handler_v2(ingest_router);

let payload = r#"
{"create": {"_index": "my-index", "_id" : "1"}}
{"ts": 1, "message": "my-message"}
"#;
let response = warp::test::request()
.path("/_elastic/_bulk")
.method("POST")
.body(payload)
.reply(&handler)
.await;
assert_eq!(response.status(), 404);

let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap();
assert_eq!(es_error.status, StatusCode::NOT_FOUND);

let reason = es_error.error.reason.unwrap();
assert_eq!(reason, "index `my-index` not found");
}
}