Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable Datadog APM error tracking with a tracing layer #1626

Merged
merged 18 commits into from Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion auth/src/api/builder.rs
Expand Up @@ -80,7 +80,7 @@ impl ApiBuilder {
request_span!(
request,
request.params.account_name = field::Empty,
request.params.account_tier = field::Empty
request.params.account_tier = field::Empty,
)
})
.with_propagation()
Expand Down
6 changes: 4 additions & 2 deletions auth/src/error.rs
Expand Up @@ -48,8 +48,10 @@ impl IntoResponse for Error {
}
_ => {
// We only want to emit error events for internal errors, not e.g. 404s.
tracing::error!(error = %self, "control plane request error");

tracing::error!(
error = &self as &(dyn std::error::Error),
"control plane request error"
);
StatusCode::INTERNAL_SERVER_ERROR
}
};
Expand Down
5 changes: 4 additions & 1 deletion auth/src/user.rs
Expand Up @@ -112,7 +112,10 @@ impl UserManagement for UserManager {

// Sync the user tier based on the subscription validity, if any.
if let Err(err) = user.sync_tier(self).await {
error!("failed syncing account");
error!(
error = &err as &dyn std::error::Error,
"failed syncing account"
);
return Err(err);
} else {
debug!("synced account");
Expand Down
3 changes: 2 additions & 1 deletion builder/src/lib.rs
Expand Up @@ -89,7 +89,8 @@ impl Service {
Err(err) => {
error!(
deployment_id,
"unexpected stdout/stderr stream close: {}", err
error = &err as &dyn std::error::Error,
"unexpected stdout/stderr stream close"
);
}
}
Expand Down
4 changes: 3 additions & 1 deletion common/src/backends/metrics.rs
Expand Up @@ -163,7 +163,9 @@ pub struct OnResponseStatusCode;

impl tower_http::trace::OnResponse<BoxBody> for OnResponseStatusCode {
fn on_response(self, response: &Response<BoxBody>, latency: Duration, span: &Span) {
span.record("http.status_code", response.status().as_u16());
// We use set_attribute here because span.record would overwrite the error.message field in
// Datadog.
span.set_attribute("http.status_code", response.status().as_u16() as i64);
debug!(
latency = format_args!("{} ns", latency.as_nanos()),
"finished processing request"
Expand Down
104 changes: 103 additions & 1 deletion common/src/backends/otlp_tracing_bridge.rs
Expand Up @@ -6,9 +6,10 @@
use opentelemetry::{
logs::{LogRecord, Logger, LoggerProvider, Severity, TraceContext},
trace::{SpanContext, TraceFlags, TraceState},
KeyValue,
};
use std::borrow::Cow;
use tracing_core::{Level, Subscriber};
use tracing_core::{field::Visit, Field, Level, Metadata, Subscriber};
use tracing_opentelemetry::OtelData;
use tracing_subscriber::{registry::LookupSpan, Layer};

Expand Down Expand Up @@ -152,3 +153,104 @@ const fn severity_of_level(level: &Level) -> Severity {
Level::ERROR => Severity::Error,
}
}

pub struct ErrorTracingLayer<S> {
_registry: std::marker::PhantomData<S>,
}

impl<S> ErrorTracingLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
pub fn new() -> Self {
ErrorTracingLayer {
_registry: std::marker::PhantomData,
}
}
}

impl<S> Layer<S> for ErrorTracingLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_event(
&self,
event: &tracing_core::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
// We only care about error events.
if !ErrorVisitor::is_valid(event.metadata()) {
return;
}

let mut visitor = ErrorVisitor::default();
event.record(&mut visitor);

let DatadogError {
message,
r#type,
stack,
} = visitor.error;

if let Some(span) = ctx.lookup_current() {
if let Some(otel_data) = span.extensions_mut().get_mut::<OtelData>() {
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
let error_fields = [
KeyValue::new("error.message", message),
KeyValue::new("error.type", r#type),
KeyValue::new("error.stack", stack),
];
let builder_attrs = otel_data
.builder
.attributes
.get_or_insert(Vec::with_capacity(3));
builder_attrs.extend(error_fields);
iulianbarbu marked this conversation as resolved.
Show resolved Hide resolved
}
};
}
}

#[derive(Default, Debug)]
pub struct DatadogError {
pub message: String,
pub r#type: String,
pub stack: String,
}
#[derive(Default)]
struct ErrorVisitor {
error: DatadogError,
}

impl ErrorVisitor {
/// We only care about error events.
fn is_valid(metadata: &Metadata) -> bool {
metadata.is_event() && metadata.level() == &Level::ERROR
}
}

impl Visit for ErrorVisitor {
fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {
// This visitor is only concerned with recording errors, do nothing for debug fields.
}
fn record_error(&mut self, _field: &Field, value: &(dyn std::error::Error + 'static)) {
// Create an error source chain, including the top-level error.
chesedo marked this conversation as resolved.
Show resolved Hide resolved
let source_chain = {
// Datadog expects there to be at least two lines in the stack field for the apm error
// tracking feature to work, so we ensure there always is.
let mut chain: String = format!("Error source chain:\n{}", value);
let mut next_err = value.source();

while let Some(err) = next_err {
chain.push_str(&format!("\n{}", err));
next_err = err.source();
}

chain
};

let error_msg = value.to_string();

self.error.message = error_msg;
self.error.r#type = "Error".to_string();
self.error.stack = source_chain;
}
}
7 changes: 5 additions & 2 deletions common/src/backends/trace.rs
Expand Up @@ -19,7 +19,7 @@ use tracing_subscriber::{fmt, prelude::*, registry::LookupSpan, EnvFilter};

use crate::log::Backend;

use super::otlp_tracing_bridge;
use super::otlp_tracing_bridge::{self, ErrorTracingLayer};

const OTLP_ADDRESS: &str = "http://otel-collector:4317";

Expand Down Expand Up @@ -80,8 +80,11 @@ where
subscriber
.with(filter_layer)
.with(fmt_layer)
.with(otel_layer)
.with(appender_tracing_layer)
.with(otel_layer)
// The error layer needs to go after the otel_layer, because it needs access to the
// otel_data extension that is set on the span in the otel_layer.
.with(ErrorTracingLayer::new())
iulianbarbu marked this conversation as resolved.
Show resolved Hide resolved
.init();
}

Expand Down
28 changes: 18 additions & 10 deletions deployer/src/deployment/queue.rs
Expand Up @@ -96,7 +96,11 @@ pub async fn task(
let response = inner.into_inner();
info!(id = %queued.id, "shuttle-builder finished building the deployment: image length is {} bytes, is_wasm flag is {} and there are {} secrets", response.image.len(), response.is_wasm, response.secrets.len());
},
Err(err) => error!(id = %queued.id, "shuttle-builder errored while building: {}", err)
Err(err) => error!(
id = %queued.id,
error = &err as &dyn std::error::Error,
"shuttle-builder errored while building"
)
};
});
}
Expand Down Expand Up @@ -126,7 +130,7 @@ pub async fn task(
Some(res) = tasks.join_next() => {
match res {
Ok(_) => (),
Err(err) => error!(error = %err, "an error happened while joining a builder task"),
Err(err) => error!(error = &err as &dyn std::error::Error, "an error happened while joining a builder task"),
}
}
else => break
Expand Down Expand Up @@ -395,20 +399,24 @@ async fn run_pre_deploy_tests(
tokio::spawn(async move {
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
let _ = tx
.send(line)
.await
.map_err(|e| error!(error = %e, "failed to send line"));
let _ = tx.send(line).await.map_err(|err| {
error!(
error = &err as &dyn std::error::Error,
"failed to send line"
)
});
}
});
let reader = tokio::io::BufReader::new(handle.stderr.take().unwrap());
tokio::spawn(async move {
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
let _ = tx2
.send(line)
.await
.map_err(|e| error!(error = %e, "failed to send line"));
let _ = tx2.send(line).await.map_err(|err| {
error!(
error = &err as &dyn std::error::Error,
"failed to send line"
)
});
}
});
let status = handle.wait().await.map_err(TestError::Run)?;
Expand Down
41 changes: 30 additions & 11 deletions deployer/src/deployment/run.rs
Expand Up @@ -129,7 +129,10 @@ pub async fn task(
match res {
Ok(_) => (),
Err(err) => {
error!(error = %err, "an error happened while joining a deployment run task")
error!(
error = &err as &dyn std::error::Error,
"an error happened while joining a deployment run task"
)
}
}

Expand Down Expand Up @@ -318,7 +321,10 @@ async fn load(
.filter_map(|resource| {
resource
.map_err(|err| {
error!(error = ?err, "failed to parse resource data");
error!(
error = err.as_ref() as &dyn std::error::Error,
"failed to parse resource data"
);
})
.ok()
})
Expand All @@ -333,7 +339,10 @@ async fn load(
secrets = combined;
}
Err(err) => {
error!(error = ?err, "failed to parse old secrets data");
error!(
error = &err as &dyn std::error::Error,
"failed to parse old secrets data"
);
}
}
}
Expand Down Expand Up @@ -387,13 +396,21 @@ async fn load(
if response.success {
Ok(())
} else {
error!(error = %response.message, "failed to load service");
Err(Error::Load(response.message))
let error = Error::Load(response.message);
error!(
error = &error as &dyn std::error::Error,
"failed to load service"
);
Err(error)
}
}
Err(error) => {
error!(%error, "failed to load service");
Err(Error::Load(error.to_string()))
let error = Error::Load(error.to_string());
error!(
error = &error as &dyn std::error::Error,
"failed to load service"
);
Err(error)
}
}
}
Expand Down Expand Up @@ -451,11 +468,13 @@ async fn run(
}));
}
Err(ref status) => {
error!(%status, "failed to start service");
start_crashed_cleanup(
&id,
Error::Start("runtime failed to start deployment".to_string()),
let error = Error::Start("runtime failed to start deployment".to_string());
error!(
%status,
error = &error as &dyn std::error::Error,
"failed to start service"
);
start_crashed_cleanup(&id, error);
}
}
}
12 changes: 9 additions & 3 deletions deployer/src/handlers/mod.rs
Expand Up @@ -235,7 +235,10 @@ pub async fn get_service_resources(
.filter_map(|resource| {
resource
.map_err(|err| {
error!(error = ?err, "failed to parse resource data");
error!(
error = err.as_ref() as &dyn std::error::Error,
"failed to parse resource data"
);
})
.ok()
})
Expand Down Expand Up @@ -477,7 +480,10 @@ pub async fn get_logs(
.collect(),
)),
Err(error) => {
error!(error = %error, "failed to retrieve logs for deployment");
error!(
error = &error as &dyn std::error::Error,
"failed to retrieve logs for deployment"
);
Err(anyhow!("failed to retrieve logs for deployment").into())
}
}
Expand Down Expand Up @@ -600,7 +606,7 @@ where
state: &S,
) -> std::result::Result<Self, Self::Rejection> {
let bytes = Bytes::from_request(req, state).await.map_err(|_| {
error!("failed to collect body bytes, is the body too large?");
info!("failed to collect body bytes, is the body too large?");
StatusCode::PAYLOAD_TOO_LARGE
})?;

Expand Down
5 changes: 4 additions & 1 deletion deployer/src/persistence/deployment.rs
Expand Up @@ -30,7 +30,10 @@ impl FromRow<'_, SqliteRow> for Deployment {
match SocketAddr::from_str(&address_str) {
Ok(address) => Some(address),
Err(err) => {
error!(error = %err, "failed to parse address from DB");
error!(
error = &err as &dyn std::error::Error,
"failed to parse address from DB"
);
None
}
}
Expand Down
6 changes: 4 additions & 2 deletions deployer/src/runtime_manager.rs
Expand Up @@ -150,8 +150,10 @@ impl RuntimeManager {
match process.start_kill() {
Ok(_) => info!(deployment_id = %id, "initiated runtime process killing"),
Err(err) => error!(
deployment_id = %id, "failed to start the killing of the runtime: {}",
err
deployment_id = %id,
error = &err as &dyn std::error::Error,
"failed to start the killing of the runtime",

),
}
}
Expand Down