Skip to content

Commit

Permalink
Add --remote-cache-warnings and make remote cache warnings less cha…
Browse files Browse the repository at this point in the history
…tty (Cherry-pick of #11859) (#11863)
  • Loading branch information
Eric-Arellano committed Apr 8, 2021
1 parent 43dfb1d commit a61fce4
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 112 deletions.
3 changes: 3 additions & 0 deletions pants.ci.toml
Expand Up @@ -4,6 +4,9 @@ colors = true
# TODO: See #9924.
dynamic_ui = false

# We want to continue to get logs when remote caching errors.
remote_cache_warnings = "backoff"

[stats]
log = true

Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/scheduler.py
Expand Up @@ -173,6 +173,7 @@ def __init__(
store_chunk_bytes=execution_options.remote_store_chunk_bytes,
store_chunk_upload_timeout=execution_options.remote_store_chunk_upload_timeout_seconds,
store_rpc_retries=execution_options.remote_store_rpc_retries,
cache_warnings_behavior=execution_options.remote_cache_warnings.value,
cache_eager_fetch=execution_options.remote_cache_eager_fetch,
execution_extra_platform_properties=tuple(
tuple(pair.split("=", 1))
Expand Down
21 changes: 21 additions & 0 deletions src/python/pants/option/global_options.py
Expand Up @@ -85,6 +85,13 @@ def to_glob_match_error_behavior(self) -> GlobMatchErrorBehavior:
return GlobMatchErrorBehavior(self.value)


@enum.unique
class RemoteCacheWarningsBehavior(Enum):
ignore = "ignore"
first_only = "first_only"
backoff = "backoff"


@enum.unique
class AuthPluginState(Enum):
OK = "ok"
Expand Down Expand Up @@ -141,6 +148,7 @@ class ExecutionOptions:
remote_store_rpc_retries: int

remote_cache_eager_fetch: bool
remote_cache_warnings: RemoteCacheWarningsBehavior

remote_execution_address: str | None
remote_execution_extra_platform_properties: List[str]
Expand Down Expand Up @@ -273,6 +281,7 @@ def from_options(
remote_store_rpc_retries=bootstrap_options.remote_store_rpc_retries,
# Remote cache setup.
remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch,
remote_cache_warnings=bootstrap_options.remote_cache_warnings,
# Remote execution setup.
remote_execution_address=remote_execution_address,
remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties,
Expand Down Expand Up @@ -344,6 +353,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
remote_store_rpc_retries=2,
# Remote cache setup.
remote_cache_eager_fetch=True,
remote_cache_warnings=RemoteCacheWarningsBehavior.first_only,
# Remote execution setup.
remote_execution_address=None,
remote_execution_extra_platform_properties=[],
Expand Down Expand Up @@ -1084,6 +1094,17 @@ def register_bootstrap_options(cls, register):
help="Number of times to retry any RPC to the remote store before giving up.",
)

register(
"--remote-cache-warnings",
type=RemoteCacheWarningsBehavior,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_warnings,
advanced=True,
help=(
"Whether to log remote cache failures at the `warn` log level.\n\n"
"All errors not logged at the `warn` level will instead be logged at the "
"`debug` level."
),
)
register(
"--remote-cache-eager-fetch",
type=bool,
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 12 additions & 25 deletions src/rust/engine/fs/store/src/remote.rs
Expand Up @@ -12,7 +12,7 @@ use bytes::{Bytes, BytesMut};
use futures::future::TryFutureExt;
use futures::Future;
use futures::StreamExt;
use grpc_util::headers_to_interceptor_fn;
use grpc_util::{headers_to_interceptor_fn, status_to_str};
use hashing::Digest;
use log::Level;
use remexec::content_addressable_storage_client::ContentAddressableStorageClient;
Expand Down Expand Up @@ -135,12 +135,10 @@ impl ByteStore {
// NOTE: This async closure must be boxed or else it triggers a consistent stack overflow
// when awaited with the `with_workunit` call below.
let result_future = Box::pin(async move {
let response = client.write(Request::new(stream)).await.map_err(|err| {
format!(
"Error from server while uploading digest {:?}: {:?}",
digest, err
)
})?;
let response = client
.write(Request::new(stream))
.await
.map_err(status_to_str)?;

let response = response.into_inner();
if response.committed_size == len as i64 {
Expand Down Expand Up @@ -209,15 +207,12 @@ impl ByteStore {

let mut stream = match stream_result {
Ok(response) => response.into_inner(),
Err(status) => match status.code() {
Code::NotFound => return Ok(None),
_ => {
return Err(format!(
"Error making CAS read request for {:?}: {:?}",
digest, status
))
Err(status) => {
return match status.code() {
Code::NotFound => Ok(None),
_ => Err(status_to_str(status)),
}
},
}
};

let read_result_closure = async {
Expand Down Expand Up @@ -254,10 +249,7 @@ impl ByteStore {
if status.code() == tonic::Code::NotFound {
None
} else {
return Err(format!(
"Error from server in response to CAS read request: {:?}",
status
));
return Err(status_to_str(status));
}
}
};
Expand Down Expand Up @@ -305,12 +297,7 @@ impl ByteStore {
let request = request.clone();
let response = client
.find_missing_blobs(request)
.map_err(|err| {
format!(
"Error from server in response to find_missing_blobs_request: {:?}",
err
)
})
.map_err(status_to_str)
.await?;

response
Expand Down
4 changes: 0 additions & 4 deletions src/rust/engine/fs/store/src/remote_tests.rs
Expand Up @@ -214,10 +214,6 @@ async fn write_file_errors() {
.store_bytes(&TestData::roland().bytes())
.await
.expect_err("Want error");
assert!(
error.contains("Error from server"),
format!("Bad error message, got: {}", error)
);
assert!(
error.contains("StubCAS is configured to always fail"),
format!("Bad error message, got: {}", error)
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/grpc_util/src/lib.rs
Expand Up @@ -136,3 +136,7 @@ pub fn headers_to_interceptor_fn(
Ok(req)
})
}

pub fn status_to_str(status: tonic::Status) -> String {
format!("{:?}: {:?}", status.code(), status.message())
}
2 changes: 2 additions & 0 deletions src/rust/engine/process_execution/Cargo.toml
Expand Up @@ -42,6 +42,8 @@ double-checked-cell-async = "2.0"
rand = "0.8"
prost = "0.7"
prost-types = "0.7"
strum = "0.20"
strum_macros = "0.20"
tonic = { version = "0.4", features = ["transport", "codegen", "tls", "tls-roots", "prost"] }
tryfuture = { path = "../tryfuture" }

Expand Down
8 changes: 8 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Expand Up @@ -616,5 +616,13 @@ impl From<Box<BoundedCommandRunner>> for Arc<dyn CommandRunner> {
}
}

#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum RemoteCacheWarningsBehavior {
Ignore,
FirstOnly,
Backoff,
}

#[cfg(test)]
mod tests;
10 changes: 3 additions & 7 deletions src/rust/engine/process_execution/src/remote.rs
Expand Up @@ -20,6 +20,7 @@ use futures::future::{self, BoxFuture, TryFutureExt};
use futures::FutureExt;
use futures::{Stream, StreamExt};
use grpc_util::prost::MessageExt;
use grpc_util::{headers_to_interceptor_fn, status_to_str};
use hashing::{Digest, Fingerprint};
use log::{debug, trace, warn, Level};
use prost::Message;
Expand All @@ -43,7 +44,6 @@ use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessCacheScope, ProcessMetadata, ProcessResultMetadata,
};
use grpc_util::headers_to_interceptor_fn;

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an Process, and may be populated only by the
Expand Down Expand Up @@ -207,7 +207,7 @@ impl CommandRunner {
.get_capabilities(request)
.await
.map(|r| r.into_inner())
.map_err(rpcerror_to_string)
.map_err(status_to_str)
};

self
Expand Down Expand Up @@ -1425,7 +1425,7 @@ pub async fn check_action_cache(
context
.workunit_store
.increment_counter(Metric::RemoteCacheReadErrors, 1);
Err(rpcerror_to_string(status))
Err(status_to_str(status))
}
},
}
Expand Down Expand Up @@ -1476,10 +1476,6 @@ pub fn format_error(error: &StatusProto) -> String {
format!("{}: {}", error_code, error.message)
}

pub(crate) fn rpcerror_to_string(status: Status) -> String {
format!("{:?}: {:?}", status.code(), status.message(),)
}

pub fn digest<T: prost::Message>(message: &T) -> Result<Digest, String> {
Ok(Digest::of_bytes(&message.to_bytes()))
}
83 changes: 43 additions & 40 deletions src/rust/engine/process_execution/src/remote_cache.rs
Expand Up @@ -10,6 +10,7 @@ use bazel_protos::require_digest;
use fs::RelativePath;
use futures::FutureExt;
use grpc_util::headers_to_interceptor_fn;
use grpc_util::status_to_str;
use hashing::Digest;
use parking_lot::Mutex;
use remexec::action_cache_client::ActionCacheClient;
Expand All @@ -21,17 +22,9 @@ use workunit_store::{with_workunit, Level, Metric, ObservationMetric, WorkunitMe
use crate::remote::make_execute_request;
use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessMetadata,
ProcessMetadata, RemoteCacheWarningsBehavior,
};

/// Every n times, log a particular remote cache error at warning level instead of debug level. We
/// don't log at warn level every time to avoid flooding the console.
///
/// Every 5 times is arbitrary and can be changed. It's based on running the `lint` goal with a
/// fake store address resulting in 5 read errors and 18 write errors; 5 seems like a
/// reasonable increment.
const LOG_ERRORS_INCREMENT: usize = 5;

/// This `CommandRunner` implementation caches results remotely using the Action Cache service
/// of the Remote Execution API.
///
Expand All @@ -52,6 +45,7 @@ pub struct CommandRunner {
cache_read: bool,
cache_write: bool,
eager_fetch: bool,
warnings_behavior: RemoteCacheWarningsBehavior,
read_errors_counter: Arc<Mutex<BTreeMap<String, usize>>>,
write_errors_counter: Arc<Mutex<BTreeMap<String, usize>>>,
}
Expand All @@ -68,6 +62,7 @@ impl CommandRunner {
platform: Platform,
cache_read: bool,
cache_write: bool,
warnings_behavior: RemoteCacheWarningsBehavior,
eager_fetch: bool,
) -> Result<Self, String> {
let tls_client_config = if action_cache_address.starts_with("https://") {
Expand Down Expand Up @@ -95,6 +90,7 @@ impl CommandRunner {
cache_read,
cache_write,
eager_fetch,
warnings_behavior,
read_errors_counter: Arc::new(Mutex::new(BTreeMap::new())),
write_errors_counter: Arc::new(Mutex::new(BTreeMap::new())),
})
Expand Down Expand Up @@ -388,10 +384,45 @@ impl CommandRunner {
client
.update_action_result(update_action_cache_request)
.await
.map_err(crate::remote::rpcerror_to_string)?;
.map_err(status_to_str)?;

Ok(())
}

fn log_cache_error(&self, err: String, err_type: CacheErrorType) {
let err_count = {
let mut errors_counter = match err_type {
CacheErrorType::ReadError => self.read_errors_counter.lock(),
CacheErrorType::WriteError => self.write_errors_counter.lock(),
};
let count = errors_counter.entry(err.clone()).or_insert(0);
*count += 1;
*count
};
let failure_desc = match err_type {
CacheErrorType::ReadError => "read from",
CacheErrorType::WriteError => "write to",
};
let log_msg = format!(
"Failed to {} remote cache ({} occurrences so far): {}",
failure_desc, err_count, err
);
let log_at_warn = match self.warnings_behavior {
RemoteCacheWarningsBehavior::Ignore => false,
RemoteCacheWarningsBehavior::FirstOnly => err_count == 1,
RemoteCacheWarningsBehavior::Backoff => err_count.is_power_of_two(),
};
if log_at_warn {
log::warn!("{}", log_msg);
} else {
log::debug!("{}", log_msg);
}
}
}

enum CacheErrorType {
ReadError,
WriteError,
}

#[async_trait]
Expand Down Expand Up @@ -458,21 +489,7 @@ impl crate::CommandRunner for CommandRunner {
cached_response_opt
}
Err(err) => {
let err_count = {
let mut errors_counter = self.read_errors_counter.lock();
let count = errors_counter.entry(err.clone()).or_insert(0);
*count += 1;
*count
};
let log_msg = format!(
"Failed to read from remote cache ({} occurrences so far): {}",
err_count, err
);
if err_count == 1 || err_count % LOG_ERRORS_INCREMENT == 0 {
log::warn!("{}", log_msg);
} else {
log::debug!("{}", log_msg);
}
self.log_cache_error(err, CacheErrorType::ReadError);
None
}
}
Expand Down Expand Up @@ -537,21 +554,7 @@ impl crate::CommandRunner for CommandRunner {
.workunit_store
.increment_counter(Metric::RemoteCacheWriteFinished, 1);
if let Err(err) = write_result {
let err_count = {
let mut errors_counter = command_runner.write_errors_counter.lock();
let count = errors_counter.entry(err.clone()).or_insert(0);
*count += 1;
*count
};
let log_msg = format!(
"Failed to write to remote cache ({} occurrences so far): {}",
err_count, err
);
if err_count == 1 || err_count % LOG_ERRORS_INCREMENT == 0 {
log::warn!("{}", log_msg);
} else {
log::debug!("{}", log_msg);
}
command_runner.log_cache_error(err, CacheErrorType::WriteError);
context2
.workunit_store
.increment_counter(Metric::RemoteCacheWriteErrors, 1);
Expand Down

0 comments on commit a61fce4

Please sign in to comment.