Skip to content

Commit

Permalink
feat: enable Datadog APM error tracking with a tracing layer (#1626)
Browse files Browse the repository at this point in the history
  • Loading branch information
oddgrd committed Feb 19, 2024
1 parent 6e135a0 commit c5f2caf
Show file tree
Hide file tree
Showing 24 changed files with 396 additions and 108 deletions.
2 changes: 1 addition & 1 deletion auth/src/api/builder.rs
Expand Up @@ -80,7 +80,7 @@ impl ApiBuilder {
request_span!(
request,
request.params.account_name = field::Empty,
request.params.account_tier = field::Empty
request.params.account_tier = field::Empty,
)
})
.with_propagation()
Expand Down
6 changes: 4 additions & 2 deletions auth/src/error.rs
Expand Up @@ -48,8 +48,10 @@ impl IntoResponse for Error {
}
_ => {
// We only want to emit error events for internal errors, not e.g. 404s.
tracing::error!(error = %self, "control plane request error");

tracing::error!(
error = &self as &(dyn std::error::Error),
"control plane request error"
);
StatusCode::INTERNAL_SERVER_ERROR
}
};
Expand Down
5 changes: 4 additions & 1 deletion auth/src/user.rs
Expand Up @@ -112,7 +112,10 @@ impl UserManagement for UserManager {

// Sync the user tier based on the subscription validity, if any.
if let Err(err) = user.sync_tier(self).await {
error!("failed syncing account");
error!(
error = &err as &dyn std::error::Error,
"failed syncing account"
);
return Err(err);
} else {
debug!("synced account");
Expand Down
3 changes: 2 additions & 1 deletion builder/src/lib.rs
Expand Up @@ -89,7 +89,8 @@ impl Service {
Err(err) => {
error!(
deployment_id,
"unexpected stdout/stderr stream close: {}", err
error = &err as &dyn std::error::Error,
"unexpected stdout/stderr stream close"
);
}
}
Expand Down
4 changes: 3 additions & 1 deletion common/src/backends/metrics.rs
Expand Up @@ -163,7 +163,9 @@ pub struct OnResponseStatusCode;

impl tower_http::trace::OnResponse<BoxBody> for OnResponseStatusCode {
fn on_response(self, response: &Response<BoxBody>, latency: Duration, span: &Span) {
span.record("http.status_code", response.status().as_u16());
// We use set_attribute here because span.record would overwrite the error.message field in
// Datadog.
span.set_attribute("http.status_code", response.status().as_u16() as i64);
debug!(
latency = format_args!("{} ns", latency.as_nanos()),
"finished processing request"
Expand Down
104 changes: 103 additions & 1 deletion common/src/backends/otlp_tracing_bridge.rs
Expand Up @@ -6,9 +6,10 @@
use opentelemetry::{
logs::{LogRecord, Logger, LoggerProvider, Severity, TraceContext},
trace::{SpanContext, TraceFlags, TraceState},
KeyValue,
};
use std::borrow::Cow;
use tracing_core::{Level, Subscriber};
use tracing_core::{field::Visit, Field, Level, Metadata, Subscriber};
use tracing_opentelemetry::OtelData;
use tracing_subscriber::{registry::LookupSpan, Layer};

Expand Down Expand Up @@ -152,3 +153,104 @@ const fn severity_of_level(level: &Level) -> Severity {
Level::ERROR => Severity::Error,
}
}

pub struct ErrorTracingLayer<S> {
_registry: std::marker::PhantomData<S>,
}

impl<S> ErrorTracingLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
pub fn new() -> Self {
ErrorTracingLayer {
_registry: std::marker::PhantomData,
}
}
}

impl<S> Layer<S> for ErrorTracingLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_event(
&self,
event: &tracing_core::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
// We only care about error events.
if !ErrorVisitor::is_valid(event.metadata()) {
return;
}

let mut visitor = ErrorVisitor::default();
event.record(&mut visitor);

let DatadogError {
message,
r#type,
stack,
} = visitor.error;

if let Some(span) = ctx.lookup_current() {
if let Some(otel_data) = span.extensions_mut().get_mut::<OtelData>() {
let error_fields = [
KeyValue::new("error.message", message),
KeyValue::new("error.type", r#type),
KeyValue::new("error.stack", stack),
];
let builder_attrs = otel_data
.builder
.attributes
.get_or_insert(Vec::with_capacity(3));
builder_attrs.extend(error_fields);
}
};
}
}

#[derive(Default, Debug)]
pub struct DatadogError {
pub message: String,
pub r#type: String,
pub stack: String,
}
#[derive(Default)]
struct ErrorVisitor {
error: DatadogError,
}

impl ErrorVisitor {
/// We only care about error events.
fn is_valid(metadata: &Metadata) -> bool {
metadata.is_event() && metadata.level() == &Level::ERROR
}
}

impl Visit for ErrorVisitor {
fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {
// This visitor is only concerned with recording errors, do nothing for debug fields.
}
fn record_error(&mut self, _field: &Field, value: &(dyn std::error::Error + 'static)) {
// Create an error source chain, including the top-level error.
let source_chain = {
// Datadog expects there to be at least two lines in the stack field for the apm error
// tracking feature to work, so we ensure there always is.
let mut chain: String = format!("Error source chain:\n{}", value);
let mut next_err = value.source();

while let Some(err) = next_err {
chain.push_str(&format!("\n{}", err));
next_err = err.source();
}

chain
};

let error_msg = value.to_string();

self.error.message = error_msg;
self.error.r#type = "Error".to_string();
self.error.stack = source_chain;
}
}
7 changes: 5 additions & 2 deletions common/src/backends/trace.rs
Expand Up @@ -19,7 +19,7 @@ use tracing_subscriber::{fmt, prelude::*, registry::LookupSpan, EnvFilter};

use crate::log::Backend;

use super::otlp_tracing_bridge;
use super::otlp_tracing_bridge::{self, ErrorTracingLayer};

const OTLP_ADDRESS: &str = "http://otel-collector:4317";

Expand Down Expand Up @@ -80,8 +80,11 @@ where
subscriber
.with(filter_layer)
.with(fmt_layer)
.with(otel_layer)
.with(appender_tracing_layer)
.with(otel_layer)
// The error layer needs to go after the otel_layer, because it needs access to the
// otel_data extension that is set on the span in the otel_layer.
.with(ErrorTracingLayer::new())
.init();
}

Expand Down
28 changes: 18 additions & 10 deletions deployer/src/deployment/queue.rs
Expand Up @@ -96,7 +96,11 @@ pub async fn task(
let response = inner.into_inner();
info!(id = %queued.id, "shuttle-builder finished building the deployment: image length is {} bytes, is_wasm flag is {} and there are {} secrets", response.image.len(), response.is_wasm, response.secrets.len());
},
Err(err) => error!(id = %queued.id, "shuttle-builder errored while building: {}", err)
Err(err) => error!(
id = %queued.id,
error = &err as &dyn std::error::Error,
"shuttle-builder errored while building"
)
};
});
}
Expand Down Expand Up @@ -126,7 +130,7 @@ pub async fn task(
Some(res) = tasks.join_next() => {
match res {
Ok(_) => (),
Err(err) => error!(error = %err, "an error happened while joining a builder task"),
Err(err) => error!(error = &err as &dyn std::error::Error, "an error happened while joining a builder task"),
}
}
else => break
Expand Down Expand Up @@ -395,20 +399,24 @@ async fn run_pre_deploy_tests(
tokio::spawn(async move {
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
let _ = tx
.send(line)
.await
.map_err(|e| error!(error = %e, "failed to send line"));
let _ = tx.send(line).await.map_err(|err| {
error!(
error = &err as &dyn std::error::Error,
"failed to send line"
)
});
}
});
let reader = tokio::io::BufReader::new(handle.stderr.take().unwrap());
tokio::spawn(async move {
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
let _ = tx2
.send(line)
.await
.map_err(|e| error!(error = %e, "failed to send line"));
let _ = tx2.send(line).await.map_err(|err| {
error!(
error = &err as &dyn std::error::Error,
"failed to send line"
)
});
}
});
let status = handle.wait().await.map_err(TestError::Run)?;
Expand Down
41 changes: 30 additions & 11 deletions deployer/src/deployment/run.rs
Expand Up @@ -129,7 +129,10 @@ pub async fn task(
match res {
Ok(_) => (),
Err(err) => {
error!(error = %err, "an error happened while joining a deployment run task")
error!(
error = &err as &dyn std::error::Error,
"an error happened while joining a deployment run task"
)
}
}

Expand Down Expand Up @@ -318,7 +321,10 @@ async fn load(
.filter_map(|resource| {
resource
.map_err(|err| {
error!(error = ?err, "failed to parse resource data");
error!(
error = err.as_ref() as &dyn std::error::Error,
"failed to parse resource data"
);
})
.ok()
})
Expand All @@ -333,7 +339,10 @@ async fn load(
secrets = combined;
}
Err(err) => {
error!(error = ?err, "failed to parse old secrets data");
error!(
error = &err as &dyn std::error::Error,
"failed to parse old secrets data"
);
}
}
}
Expand Down Expand Up @@ -387,13 +396,21 @@ async fn load(
if response.success {
Ok(())
} else {
error!(error = %response.message, "failed to load service");
Err(Error::Load(response.message))
let error = Error::Load(response.message);
error!(
error = &error as &dyn std::error::Error,
"failed to load service"
);
Err(error)
}
}
Err(error) => {
error!(%error, "failed to load service");
Err(Error::Load(error.to_string()))
let error = Error::Load(error.to_string());
error!(
error = &error as &dyn std::error::Error,
"failed to load service"
);
Err(error)
}
}
}
Expand Down Expand Up @@ -451,11 +468,13 @@ async fn run(
}));
}
Err(ref status) => {
error!(%status, "failed to start service");
start_crashed_cleanup(
&id,
Error::Start("runtime failed to start deployment".to_string()),
let error = Error::Start("runtime failed to start deployment".to_string());
error!(
%status,
error = &error as &dyn std::error::Error,
"failed to start service"
);
start_crashed_cleanup(&id, error);
}
}
}
12 changes: 9 additions & 3 deletions deployer/src/handlers/mod.rs
Expand Up @@ -235,7 +235,10 @@ pub async fn get_service_resources(
.filter_map(|resource| {
resource
.map_err(|err| {
error!(error = ?err, "failed to parse resource data");
error!(
error = err.as_ref() as &dyn std::error::Error,
"failed to parse resource data"
);
})
.ok()
})
Expand Down Expand Up @@ -477,7 +480,10 @@ pub async fn get_logs(
.collect(),
)),
Err(error) => {
error!(error = %error, "failed to retrieve logs for deployment");
error!(
error = &error as &dyn std::error::Error,
"failed to retrieve logs for deployment"
);
Err(anyhow!("failed to retrieve logs for deployment").into())
}
}
Expand Down Expand Up @@ -600,7 +606,7 @@ where
state: &S,
) -> std::result::Result<Self, Self::Rejection> {
let bytes = Bytes::from_request(req, state).await.map_err(|_| {
error!("failed to collect body bytes, is the body too large?");
info!("failed to collect body bytes, is the body too large?");
StatusCode::PAYLOAD_TOO_LARGE
})?;

Expand Down
5 changes: 4 additions & 1 deletion deployer/src/persistence/deployment.rs
Expand Up @@ -30,7 +30,10 @@ impl FromRow<'_, SqliteRow> for Deployment {
match SocketAddr::from_str(&address_str) {
Ok(address) => Some(address),
Err(err) => {
error!(error = %err, "failed to parse address from DB");
error!(
error = &err as &dyn std::error::Error,
"failed to parse address from DB"
);
None
}
}
Expand Down
6 changes: 4 additions & 2 deletions deployer/src/runtime_manager.rs
Expand Up @@ -150,8 +150,10 @@ impl RuntimeManager {
match process.start_kill() {
Ok(_) => info!(deployment_id = %id, "initiated runtime process killing"),
Err(err) => error!(
deployment_id = %id, "failed to start the killing of the runtime: {}",
err
deployment_id = %id,
error = &err as &dyn std::error::Error,
"failed to start the killing of the runtime",

),
}
}
Expand Down

0 comments on commit c5f2caf

Please sign in to comment.