Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate ES bulk response items on success too #5019

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn elastic_ingest_bulk(
let bulk_response = ElasticBulkResponse {
took_millis,
errors,
items: Vec::new(),
actions: Vec::new(),
};
Ok(bulk_response)
}
Expand Down
88 changes: 79 additions & 9 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ pub(crate) struct ElasticBulkResponse {
#[serde(rename = "took")]
pub took_millis: u64,
pub errors: bool,
pub items: Vec<ElasticBulkItemAction>,
#[serde(rename = "items")]
pub actions: Vec<ElasticBulkAction>,
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum ElasticBulkItemAction {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ElasticBulkAction {
#[serde(rename = "create")]
Create(ElasticBulkItem),
#[serde(rename = "index")]
Index(ElasticBulkItem),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ElasticBulkItem {
#[serde(rename = "_index")]
pub index_id: IndexId,
Expand All @@ -63,7 +64,7 @@ pub(crate) struct ElasticBulkItem {
pub error: Option<ElasticBulkError>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ElasticBulkError {
#[serde(rename = "index")]
pub index_id: Option<IndexId>,
Expand Down Expand Up @@ -132,8 +133,32 @@ pub(crate) async fn elastic_bulk_ingest_v2(
};
let ingest_response_v2 = ingest_router.ingest(ingest_request).await?;
let errors = !ingest_response_v2.failures.is_empty();
let mut items = Vec::new();
let mut actions: Vec<ElasticBulkAction> = Vec::new();

for success in ingest_response_v2.successes {
let es_doc_ids = per_subrequest_id_es_doc_ids
.remove(&success.subrequest_id)
.ok_or_else(|| {
ElasticsearchError::new(
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"could not find subrequest `{}` in bulk request",
success.subrequest_id
),
None,
)
})?;
for es_doc_id in es_doc_ids {
let item = ElasticBulkItem {
index_id: success.index_uid().index_id.clone(),
es_doc_id,
status: StatusCode::CREATED,
error: None,
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
}
}
for failure in ingest_response_v2.failures {
let es_doc_ids = per_subrequest_id_es_doc_ids
.remove(&failure.subrequest_id)
Expand Down Expand Up @@ -161,7 +186,25 @@ pub(crate) async fn elastic_bulk_ingest_v2(
status: StatusCode::NOT_FOUND,
error: Some(error),
};
items.push(ElasticBulkItemAction::Index(item));
let action = ElasticBulkAction::Index(item);
actions.push(action);
}
}
IngestFailureReason::Timeout => {
for es_doc_id in es_doc_ids {
let error = ElasticBulkError {
index_id: Some(failure.index_id.clone()),
exception: ErrorCauseException::Timeout,
reason: format!("timeout [{}]", failure.index_id),
};
let item = ElasticBulkItem {
index_id: failure.index_id.clone(),
es_doc_id,
status: StatusCode::REQUEST_TIMEOUT,
error: Some(error),
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
}
}
_ => {
Expand All @@ -174,7 +217,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let bulk_response = ElasticBulkResponse {
took_millis,
errors,
items,
actions,
};
Ok(bulk_response)
}
Expand Down Expand Up @@ -274,6 +317,33 @@ mod tests {

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(!bulk_response.errors);

let mut items = bulk_response
.actions
.into_iter()
.map(|action| match action {
ElasticBulkAction::Create(item) => item,
ElasticBulkAction::Index(item) => item,
})
.collect::<Vec<_>>();
assert_eq!(items.len(), 3);

items.sort_by(|left, right| {
left.index_id
.cmp(&right.index_id)
.then(left.es_doc_id.cmp(&right.es_doc_id))
});
assert_eq!(items[0].index_id, "my-index-1");
assert!(items[0].es_doc_id.is_none());
assert_eq!(items[0].status, StatusCode::CREATED);

assert_eq!(items[1].index_id, "my-index-1");
assert_eq!(items[1].es_doc_id.as_ref().unwrap(), "1");
assert_eq!(items[1].status, StatusCode::CREATED);

assert_eq!(items[2].index_id, "my-index-2");
assert_eq!(items[2].es_doc_id.as_ref().unwrap(), "1");
assert_eq!(items[2].status, StatusCode::CREATED);
}

#[tokio::test]
Expand Down Expand Up @@ -466,6 +536,6 @@ mod tests {

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(bulk_response.errors);
assert_eq!(bulk_response.items.len(), 3);
assert_eq!(bulk_response.actions.len(), 3);
}
}
3 changes: 3 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ pub enum ErrorCauseException {
IllegalArgument,
#[serde(rename = "index_not_found_exception")]
IndexNotFound,
#[serde(rename = "timeout_exception")]
Timeout,
}

impl ErrorCauseException {
Expand All @@ -150,6 +152,7 @@ impl ErrorCauseException {
Self::ActionRequestValidation => "action_request_validation_exception",
Self::IllegalArgument => "illegal_argument_exception",
Self::IndexNotFound => "index_not_found_exception",
Self::Timeout => "timeout_exception",
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ndjson:
- index: { "_index": "test-index-happy-path", "_id": "1" }
- message: Hello, World!
- index: { "_index": "test-index-happy-path" }
- message: Hola, Mundo!
status_code: 200
expected:
errors: false
items:
- index:
_index: test-index-happy-path
_id: "1"
status: 201
- index:
_index: test-index-happy-path
status: 201
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ json: {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,24 @@ method: POST
api_root: http://localhost:7280/api/v1/
endpoint: indexes/
json:
version: "0.7"
version: "0.8"
index_id: test-index
doc_mapping:
field_mappings:
- name: message
type: text
sleep_after: 3
---
# Create index template
method: POST
api_root: http://localhost:7280/api/v1/
endpoint: templates
json:
version: "0.8"
template_id: test-index-template
index_id_patterns:
- test-index-happy-path*
doc_mapping:
mode: dynamic
indexing_settings:
commit_timeout_secs: 1
Loading