Skip to content

Commit

Permalink
Adding logs whenever we emit an internal error. (#5246)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 25, 2024
1 parent 81616da commit 7dd4275
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 25 deletions.
22 changes: 19 additions & 3 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use futures_util::StreamExt;
use itertools::Itertools;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_common::pretty::PrettySample;
use quickwit_common::rate_limited_error;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
Expand Down Expand Up @@ -70,13 +71,28 @@ pub enum IndexServiceError {
impl ServiceError for IndexServiceError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(err_msg) => {
rate_limited_error!(limit_per_min = 6, err_msg);
ServiceErrorCode::Internal
}
Self::InvalidConfig(_) => ServiceErrorCode::BadRequest,
Self::InvalidIdentifier(_) => ServiceErrorCode::BadRequest,
Self::Metastore(error) => error.error_code(),
Self::OperationNotAllowed(_) => ServiceErrorCode::Forbidden,
Self::SplitDeletion(_) => ServiceErrorCode::Internal,
Self::Storage(_) => ServiceErrorCode::Internal,
Self::SplitDeletion(delete_splits_error) => {
rate_limited_error!(
limit_per_min = 6,
"index service internal error/split deletion: {delete_splits_error:?}"
);
ServiceErrorCode::Internal
}
Self::Storage(storage_error) => {
rate_limited_error!(
limit_per_min = 6,
"index service internal error/storage {storage_error:?}"
);
ServiceErrorCode::Internal
}
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::io;

use mrecordlog::error::*;
use quickwit_actors::AskError;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::BufferError;
pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
use quickwit_proto::ingest::IngestV2Error;
Expand Down Expand Up @@ -96,12 +97,24 @@ impl From<IngestV2Error> for IngestServiceError {
impl ServiceError for IngestServiceError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Corruption { .. } => ServiceErrorCode::Internal,
Self::Corruption(err_msg) => {
rate_limited_error!(
limit_per_min = 6,
"ingest/corruption internal error: {err_msg}"
);
ServiceErrorCode::Internal
}
Self::IndexAlreadyExists { .. } => ServiceErrorCode::AlreadyExists,
Self::IndexNotFound { .. } => ServiceErrorCode::NotFound,
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(err_msg) => {
rate_limited_error!(limit_per_min = 6, "ingest internal error: {err_msg}");
ServiceErrorCode::Internal
}
Self::InvalidPosition(_) => ServiceErrorCode::BadRequest,
Self::IoError { .. } => ServiceErrorCode::Internal,
Self::IoError(io_err) => {
rate_limited_error!(limit_per_min = 6, "ingest/io internal error: {io_err}");
ServiceErrorCode::Internal
}
Self::RateLimited => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-janitor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_common::rate_limited_error;
use quickwit_proto::metastore::MetastoreError;
use quickwit_proto::{ServiceError, ServiceErrorCode};
use serde::{Deserialize, Serialize};
Expand All @@ -37,7 +38,10 @@ pub enum JanitorError {
impl ServiceError for JanitorError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(err_msg) => {
rate_limited_error!(limit_per_min = 6, "janitor internal error {err_msg}");
ServiceErrorCode::Internal
}
Self::InvalidDeleteQuery(_) => ServiceErrorCode::BadRequest,
Self::Metastore(metastore_error) => metastore_error.error_code(),
}
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-proto/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_common::rate_limited_error;
use quickwit_common::tower::MakeLoadShedError;
use thiserror;

Expand All @@ -43,7 +44,10 @@ pub enum ClusterError {
impl ServiceError for ClusterError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(err_msg) => {
rate_limited_error!(limit_per_min = 6, "cluster internal error: {err_msg}");
ServiceErrorCode::Internal
}
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-proto/src/control_plane/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_actors::AskError;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
use thiserror;

Expand Down Expand Up @@ -52,7 +53,13 @@ impl From<quickwit_common::tower::TaskCancelled> for ControlPlaneError {
impl ServiceError for ControlPlaneError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(error_msg) => {
rate_limited_error!(
limit_per_min = 6,
"control plane internal error: {error_msg}"
);
ServiceErrorCode::Internal
}
Self::Metastore(metastore_error) => metastore_error.error_code(),
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::ops::{Add, Mul, Sub};
use bytesize::ByteSize;
use quickwit_actors::AskError;
use quickwit_common::pubsub::Event;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
use serde::{Deserialize, Serialize};
use thiserror;
Expand Down Expand Up @@ -55,7 +56,10 @@ pub enum IndexingError {
impl ServiceError for IndexingError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(err_msg) => {
rate_limited_error!(limit_per_min = 6, "indexing error: {err_msg}");
ServiceErrorCode::Internal
}
Self::Metastore(metastore_error) => metastore_error.error_code(),
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::iter::zip;

use bytes::Bytes;
use bytesize::ByteSize;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::MakeLoadShedError;

use self::ingester::{PersistFailureReason, ReplicateFailureReason};
Expand Down Expand Up @@ -59,7 +60,10 @@ impl From<quickwit_common::tower::TaskCancelled> for IngestV2Error {
impl ServiceError for IngestV2Error {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(error_msg) => {
rate_limited_error!(limit_per_min = 6, "ingest internal error: {error_msg}");
ServiceErrorCode::Internal
}
Self::ShardNotFound { .. } => ServiceErrorCode::NotFound,
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Expand Down
49 changes: 43 additions & 6 deletions quickwit/quickwit-proto/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::fmt;

use quickwit_common::rate_limited_error;
use quickwit_common::retry::Retryable;
use quickwit_common::tower::MakeLoadShedError;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -192,15 +193,51 @@ impl ServiceError for MetastoreError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::AlreadyExists(_) => ServiceErrorCode::AlreadyExists,
Self::Connection { .. } => ServiceErrorCode::Internal,
Self::Db { .. } => ServiceErrorCode::Internal,
Self::Connection { message } => {
rate_limited_error!(
limit_per_min = 6,
"metastore/connection internal error: {message}"
);
ServiceErrorCode::Internal
}
Self::Db { message } => {
rate_limited_error!(limit_per_min = 6, "metastore/db internal error: {message}");
ServiceErrorCode::Internal
}
Self::FailedPrecondition { .. } => ServiceErrorCode::BadRequest,
Self::Forbidden { .. } => ServiceErrorCode::Forbidden,
Self::Internal { .. } => ServiceErrorCode::Internal,
Self::Internal { message, cause } => {
rate_limited_error!(
limit_per_min = 6,
"metastore internal error: {message} cause: {cause}"
);
ServiceErrorCode::Internal
}
Self::InvalidArgument { .. } => ServiceErrorCode::BadRequest,
Self::Io { .. } => ServiceErrorCode::Internal,
Self::JsonDeserializeError { .. } => ServiceErrorCode::Internal,
Self::JsonSerializeError { .. } => ServiceErrorCode::Internal,
Self::Io { message } => {
rate_limited_error!(limit_per_min = 6, "metastore/io internal error: {message}");
ServiceErrorCode::Internal
}
Self::JsonDeserializeError {
struct_name,
message,
} => {
rate_limited_error!(
limit_per_min = 6,
"metastore/jsondeser internal error: [{struct_name}] {message}"
);
ServiceErrorCode::Internal
}
Self::JsonSerializeError {
struct_name,
message,
} => {
rate_limited_error!(
limit_per_min = 6,
"metastore/jsonser internal error: [{struct_name}] {message}"
);
ServiceErrorCode::Internal
}
Self::NotFound(_) => ServiceErrorCode::NotFound,
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Expand Down
14 changes: 12 additions & 2 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_common::rate_limited_error;
use quickwit_doc_mapper::QueryParserError;
use quickwit_proto::error::grpc_error_to_grpc_status;
use quickwit_proto::metastore::{EntityKind, MetastoreError};
Expand Down Expand Up @@ -56,11 +57,20 @@ impl ServiceError for SearchError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::IndexesNotFound { .. } => ServiceErrorCode::NotFound,
Self::Internal(_) => ServiceErrorCode::Internal,
Self::Internal(error_msg) => {
rate_limited_error!(limit_per_min = 6, "search internal error: {error_msg}");
ServiceErrorCode::Internal
}
Self::InvalidAggregationRequest(_) => ServiceErrorCode::BadRequest,
Self::InvalidArgument(_) => ServiceErrorCode::BadRequest,
Self::InvalidQuery(_) => ServiceErrorCode::BadRequest,
Self::StorageResolver(_) => ServiceErrorCode::Internal,
Self::StorageResolver(storage_err) => {
rate_limited_error!(
limit_per_min = 6,
"search's storager resolver internal error: {storage_err}"
);
ServiceErrorCode::Internal
}
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
Expand Down
31 changes: 26 additions & 5 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::time::Instant;

use hyper::StatusCode;
use quickwit_common::rate_limited_error;
use quickwit_config::INGEST_V2_SOURCE_ID;
use quickwit_ingest::IngestRequestV2Builder;
use quickwit_proto::ingest::router::{
Expand Down Expand Up @@ -147,7 +148,10 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let Some(ingest_request) = ingest_request_opt else {
return Ok(ElasticBulkResponse::default());
};
let ingest_response = ingest_router.ingest(ingest_request).await?;
let ingest_response = ingest_router.ingest(ingest_request).await.map_err(|err| {
rate_limited_error!(limit_per_min=6, err=?err, "router error");
err
})?;
make_elastic_bulk_response_v2(ingest_response, per_subrequest_doc_handles, now)
}

Expand All @@ -168,8 +172,14 @@ fn make_elastic_bulk_response_v2(
.expect("`index_uid` should be a required field");

// Find the doc handles for the subresponse.
let mut doc_handles =
remove_doc_handles(&mut per_subrequest_doc_handles, success.subrequest_id)?;
let mut doc_handles = remove_doc_handles(
&mut per_subrequest_doc_handles,
success.subrequest_id,
)
.map_err(|err| {
rate_limited_error!(limit_per_min=6, index_id=%index_id, "could not find subrequest id");
err
})?;
doc_handles.sort_unstable_by(|left, right| left.doc_uid.cmp(&right.doc_uid));

// Populate the response items with one error per parse failure.
Expand All @@ -178,9 +188,11 @@ fn make_elastic_bulk_response_v2(

// Since the generated doc UIDs are monotonically increasing, and inserted in order, we
// can find doc handles using binary search.
let failed_doc_uid = parse_failure.doc_uid();
let doc_handle_idx = doc_handles
.binary_search_by_key(&parse_failure.doc_uid(), |doc_handle| doc_handle.doc_uid)
.binary_search_by_key(&failed_doc_uid, |doc_handle| doc_handle.doc_uid)
.map_err(|_| {
rate_limited_error!(limit_per_min=6, doc_uid=%failed_doc_uid, "could not find doc_uid from parse failure");
ElasticsearchError::new(
StatusCode::INTERNAL_SERVER_ERROR,
format!(
Expand Down Expand Up @@ -228,7 +240,16 @@ fn make_elastic_bulk_response_v2(

// Find the doc handles for the subrequest.
let doc_handles =
remove_doc_handles(&mut per_subrequest_doc_handles, failure.subrequest_id)?;
remove_doc_handles(&mut per_subrequest_doc_handles, failure.subrequest_id).map_err(
|err| {
rate_limited_error!(
limit_per_min = 6,
subrequest = failure.subrequest_id,
"failed to find error subrequest"
);
err
},
)?;

// Populate the response items with one error per doc handle.
let (exception, reason, status) = match failure.reason() {
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-serve/src/otlp_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::Bytes;
use quickwit_common::rate_limited_error;
use quickwit_opentelemetry::otlp::{
OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID,
};
Expand Down Expand Up @@ -154,7 +155,10 @@ impl ServiceError for OtlpApiError {
fn error_code(&self) -> ServiceErrorCode {
match self {
OtlpApiError::InvalidPayload(_) => ServiceErrorCode::BadRequest,
OtlpApiError::Ingest(_) => ServiceErrorCode::Internal,
OtlpApiError::Ingest(err_msg) => {
rate_limited_error!(limit_per_min = 6, "otlp internal error: {err_msg}");
ServiceErrorCode::Internal
}
}
}
}
Expand Down

0 comments on commit 7dd4275

Please sign in to comment.