Skip to content

Commit

Permalink
Avoid catastrophic failure when an ES request times out
Browse files Browse the repository at this point in the history
As of 0.20.6 ES it's possible for a bulk request to time out,
which we unwrapped on like goofs. This commit removes that
unwrap and fails the flush in the event of a timeout bubbling up.

This resolves #353 and elastic-rs/elastic#286.

Related to #355.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
Brian L. Troutwine committed Dec 5, 2017
1 parent d91c1fa commit 32eab81
Showing 1 changed file with 118 additions and 106 deletions.
224 changes: 118 additions & 106 deletions src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,132 +185,144 @@ impl Sink for Elasticsearch {

let mut buffer = String::with_capacity(4048);
self.bulk_body(&mut buffer);
let bulk_resp: Result<BulkResponse> = client
.request(BulkRequest::new(buffer))
.send()
.unwrap()
.into_response::<BulkResponse>();

ELASTIC_INTERNAL_BUFFER_LEN.store(self.buffer.len(), Ordering::Relaxed);
match bulk_resp {
Ok(bulk) => {
ELASTIC_RECORDS_DELIVERY.fetch_add(1, Ordering::Relaxed);
for item in bulk.iter() {
match item {
Ok(item) => {
let uuid = uuid::Uuid::parse_str(item.id())
.expect("catastrophic error, TID not a UUID");
if let Ok(idx) = self.buffer
.binary_search_by(|probe| probe.uuid.cmp(&uuid))
{
self.buffer.remove(idx);
}
ELASTIC_RECORDS_TOTAL_DELIVERED
.fetch_add(1, Ordering::Relaxed);
}
Err(item) => {
let uuid = uuid::Uuid::parse_str(item.id())
.expect("catastrophic error, TID not a UUID");
if let Ok(idx) = self.buffer
.binary_search_by(|probe| probe.uuid.cmp(&uuid))
{
self.buffer[idx].attempts += 1;
if self.buffer[idx].attempts
> self.delivery_attempt_limit
if let Ok(snd) = client.request(BulkRequest::new(buffer)).send() {
let bulk_resp: Result<BulkResponse> = snd.into_response::<BulkResponse>();
ELASTIC_INTERNAL_BUFFER_LEN.store(self.buffer.len(), Ordering::Relaxed);
match bulk_resp {
Ok(bulk) => {
ELASTIC_RECORDS_DELIVERY.fetch_add(1, Ordering::Relaxed);
for item in bulk.iter() {
match item {
Ok(item) => {
let uuid = uuid::Uuid::parse_str(item.id())
.expect("catastrophic error, TID not a UUID");
if let Ok(idx) = self.buffer
.binary_search_by(|probe| probe.uuid.cmp(&uuid))
{
self.buffer.remove(idx);
}
ELASTIC_RECORDS_TOTAL_DELIVERED
.fetch_add(1, Ordering::Relaxed);
}
ELASTIC_RECORDS_TOTAL_FAILED
.fetch_add(1, Ordering::Relaxed);
if let Some(cause) = item.cause() {
Err(item) => {
let uuid = uuid::Uuid::parse_str(item.id())
.expect("catastrophic error, TID not a UUID");
if let Ok(idx) = self.buffer
.binary_search_by(|probe| probe.uuid.cmp(&uuid))
{
self.buffer[idx].attempts += 1;
if self.buffer[idx].attempts
> self.delivery_attempt_limit
{
self.buffer.remove(idx);
}
}
ELASTIC_RECORDS_TOTAL_FAILED
.fetch_add(1, Ordering::Relaxed);
if let Some(cause) = item.cause() {
error!(
"Failed to write item with error {}, cause {}",
item.description(),
cause
);
} else {
error!(
"Failed to write item with error {}",
item.description()
);
}
match item.action() {
bulk::Action::Index => {
ELASTIC_BULK_ACTION_INDEX_ERR
.fetch_add(1, Ordering::Relaxed)
}
bulk::Action::Create => {
ELASTIC_BULK_ACTION_CREATE_ERR
.fetch_add(1, Ordering::Relaxed)
}
bulk::Action::Update => {
ELASTIC_BULK_ACTION_UPDATE_ERR
.fetch_add(1, Ordering::Relaxed)
}
bulk::Action::Delete => {
ELASTIC_BULK_ACTION_DELETE_ERR
.fetch_add(1, Ordering::Relaxed)
}
};
}
}
}
}
Err(err) => match err {
error::Error::Api(ref api_error) => {
use elastic::error::ApiError;
match *api_error {
ApiError::IndexNotFound { ref index } => {
ELASTIC_ERROR_API_INDEX_NOT_FOUND
.fetch_add(1, Ordering::Relaxed);
error!(
"Failed to write item with error {}, cause {}",
item.description(),
cause
"Unable to write, API Error (Index Not Found): {}",
index
);
} else {
}
ApiError::Parsing { ref reason, .. } => {
ELASTIC_ERROR_API_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Failed to write item with error {}",
item.description()
"Unable to write, API Error (Parsing): {}",
reason
);
}
match item.action() {
bulk::Action::Index => ELASTIC_BULK_ACTION_INDEX_ERR
.fetch_add(1, Ordering::Relaxed),
bulk::Action::Create => ELASTIC_BULK_ACTION_CREATE_ERR
.fetch_add(1, Ordering::Relaxed),
bulk::Action::Update => ELASTIC_BULK_ACTION_UPDATE_ERR
.fetch_add(1, Ordering::Relaxed),
bulk::Action::Delete => ELASTIC_BULK_ACTION_DELETE_ERR
.fetch_add(1, Ordering::Relaxed),
};
}
}
}
}
Err(err) => match err {
error::Error::Api(ref api_error) => {
use elastic::error::ApiError;
match *api_error {
ApiError::IndexNotFound { ref index } => {
ELASTIC_ERROR_API_INDEX_NOT_FOUND
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Index Not Found): {}",
index
);
}
ApiError::Parsing { ref reason, .. } => {
ELASTIC_ERROR_API_PARSING.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Parsing): {}", reason);
}
ApiError::MapperParsing { ref reason, .. } => {
ELASTIC_ERROR_API_MAPPER_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Mapper Parsing): {}",
reason
);
}
ApiError::ActionRequestValidation { ref reason, .. } => {
ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION
.fetch_add(1, Ordering::Relaxed);
error!(
ApiError::MapperParsing { ref reason, .. } => {
ELASTIC_ERROR_API_MAPPER_PARSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Mapper Parsing): {}",
reason
);
}
ApiError::ActionRequestValidation {
ref reason, ..
} => {
ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Action Request Validation): {}",
reason
);
}
ApiError::DocumentMissing { ref index, .. } => {
ELASTIC_ERROR_API_DOCUMENT_MISSING
.fetch_add(1, Ordering::Relaxed);
error!(
}
ApiError::DocumentMissing { ref index, .. } => {
ELASTIC_ERROR_API_DOCUMENT_MISSING
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Document Missing): {}",
index
);
}
ApiError::IndexAlreadyExists { ref index, .. } => {
ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS
.fetch_add(1, Ordering::Relaxed);
error!(
}
ApiError::IndexAlreadyExists { ref index, .. } => {
ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS
.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, API Error (Index Already Exists): {}",
index
);
}
_ => {
ELASTIC_ERROR_API_UNKNOWN.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Unknown)");
}
_ => {
ELASTIC_ERROR_API_UNKNOWN
.fetch_add(1, Ordering::Relaxed);
error!("Unable to write, API Error (Unknown)");
}
}
}
}
error::Error::Client(ref client_error) => {
ELASTIC_ERROR_CLIENT.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, client error: {}",
client_error.description()
);
}
},
error::Error::Client(ref client_error) => {
ELASTIC_ERROR_CLIENT.fetch_add(1, Ordering::Relaxed);
error!(
"Unable to write, client error: {}",
client_error.description()
);
}
},
}
}
}

Expand Down

0 comments on commit 32eab81

Please sign in to comment.