From d575c4225a88b3e821044562ea9e6193b6fe8232 Mon Sep 17 00:00:00 2001 From: David Hewitt Date: Sun, 2 Nov 2025 19:25:45 +0000 Subject: [PATCH 1/2] add `force_flush` and documentation to support running on AWS Lambda --- src/lib.rs | 4 ++ src/logfire.rs | 12 ++++ src/usage/lambda.md | 162 ++++++++++++++++++++++++++++++++++++++++++++ src/usage/mod.rs | 2 + 4 files changed, 180 insertions(+) create mode 100644 src/usage/lambda.md diff --git a/src/lib.rs b/src/lib.rs index e5474b1..d5810fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,6 +69,10 @@ //! All environment variables supported by the Rust Opentelemetry SDK are also supported by the //! Logfire SDK. //! +//! ## Usage Guide +//! +//! See the [usage guide][usage] for more detailed information about how to use this SDK to its full potential. +//! //! # Examples //! //! See [examples][usage::examples] subchapter of this documentation. diff --git a/src/logfire.rs b/src/logfire.rs index e0d97ab..671ac7d 100644 --- a/src/logfire.rs +++ b/src/logfire.rs @@ -95,6 +95,18 @@ impl Logfire { } } + /// Forcibly flush the current data captured by Logfire. + /// + /// # Errors + /// + /// This will error if the underlying OpenTelemetry SDK fails to flush data. + pub fn force_flush(&self) -> Result<(), opentelemetry_sdk::error::OTelSdkError> { + self.tracer_provider.force_flush()?; + self.meter_provider.force_flush()?; + self.logger_provider.force_flush()?; + Ok(()) + } + /// Shuts down the Logfire instance. /// /// This will flush all data to the opentelemetry exporters and then close all diff --git a/src/usage/lambda.md b/src/usage/lambda.md new file mode 100644 index 0000000..6fda9b4 --- /dev/null +++ b/src/usage/lambda.md @@ -0,0 +1,162 @@ +# Instrumenting AWS Lambda with Pydantic Logfire + +When running on AWS Lambda, extra care must be taken to ensure all telemetry is successfully exported to Logfire. + +The AWS Lambda runtime will freeze lambda processes as soon as the response is delivered. +This means that background threads (such as those exporting telemetry to Logfire) will be paused. +To ensure that telemetry is exported successfully, it's necessary to flush Logfire before completing the Lambda invocation. + +The following example demonstrates using Logfire with the `lambda_runtime` crate to instrument a Lambda function. +A `tower::Layer` is used to ensure that Logfire is flushed at the end of every invocation. + +```rust,ignore +use std::{ + future::Future, + task::{Context, Poll}, +}; + +use lambda_runtime::{service_fn, Error, LambdaEvent}; +use logfire::{config::ConsoleOptions, Logfire}; +use pin_project::pin_project; +use serde::{Deserialize, Serialize}; +use tower::{Layer, Service}; + +/// A `tower::Layer` that will be used to introduce flushing to the lambda function. +pub struct LogfireFlushLayer { + logfire: Logfire, +} + +impl Layer for LogfireFlushLayer { + type Service = LogfireFlushService; + + fn layer(&self, service: S) -> Self::Service { + LogfireFlushService { + logfire: self.logfire.clone(), + service, + } + } +} + +/// A `tower::Service` which wraps an inner service to flush Logfire when the service +/// finishes executing. +pub struct LogfireFlushService { + logfire: Logfire, + service: S, +} + +impl Service for LogfireFlushService +where + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = LogfireFlushFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + LogfireFlushFuture { + inner: Some(self.service.call(request)), + logfire: self.logfire.clone(), + } + } +} + +/// The future produced when calling the `LogfireFlushService`. The future is +/// responsible for driving the inner future and then flushing logfire when +/// the inner future completes. +#[pin_project] +pub struct LogfireFlushFuture { + #[pin] + inner: Option, + logfire: Logfire, +} + +impl Future for LogfireFlushFuture +where + F: Future>, +{ + type Output = Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let Some(inner) = this.inner.as_mut().as_pin_mut() else { + panic!("`LogfireFlushFuture` polled after completion"); + }; + match inner.poll(cx) { + Poll::Ready(result) => { + // Drop the inner future so that any spans it holds are dropped before flushing + this.inner.set(None); + // Flush logfire before returning. + // Note that this is a blocking function. In the context of the current lambda + // invocation that should not be a problem. + let _ = this.logfire.force_flush(); + Poll::Ready(result) + } + Poll::Pending => Poll::Pending, + } + } +} + +/// Example lambda request payload. +#[derive(Deserialize)] +pub(crate) struct IncomingMessage { + command: String, +} + +/// Example lambda response payload. +#[derive(Serialize)] +pub(crate) struct OutgoingMessage { + req_id: String, + msg: String, +} + +/// Main body of the lambda function. +#[tracing::instrument(skip_all)] +pub(crate) async fn function_handler( + event: LambdaEvent, +) -> Result { + + // Change this logic to be whatever your lambda function needs to do. + + Ok(OutgoingMessage { + req_id: event.context.request_id, + msg: format!("Command {}.", event.payload.command), + }) +} + +/// Main function for the lambda process. +#[tokio::main] +async fn main() -> Result<(), Error> { + // 1. Configure logfire on startup + let logfire = logfire::configure() + .with_console(Some(ConsoleOptions::default())) + .finish()?; + logfire::info!("Starting up"); + + // 2. Lambda processes require special termination logic. Logfire's + // shutdown guard can be passed into `lambda_runtime`'s graceful + // shutdown handler to ensure that telemetry is flushed when + // idle lambda processes are shutdown. + let shutdown_guard = logfire.clone().shutdown_guard(); + lambda_runtime::spawn_graceful_shutdown_handler(|| async move { + logfire::info!("Shutting down"); + let _ = shutdown_guard.shutdown(); + }) + .await; + + // 3. Prepare the main `lambda_runtime::Runtime` + lambda_runtime::Runtime::new(service_fn(function_handler)) + // 4. Add a `TracingLayer` before the logfire layer + .layer(lambda_runtime::layers::TracingLayer::new()) + // 5. Add the flushing layer after; this way the spans created + // by the `TracingLayer` will be closed before logfire is flushed. + .layer(LogfireFlushLayer { logfire }) + // 6. And finally, run the process. + .run() + .await +} + +``` diff --git a/src/usage/mod.rs b/src/usage/mod.rs index d0cb3ff..d992e26 100644 --- a/src/usage/mod.rs +++ b/src/usage/mod.rs @@ -108,4 +108,6 @@ //! See [examples] subchapter of this documentation. pub mod examples; +#[doc = include_str!("./lambda.md")] +pub mod lambda {} pub mod metrics; From b6e490ca42537d97738f61c32f3e3234bd9261a1 Mon Sep 17 00:00:00 2001 From: David Hewitt Date: Tue, 4 Nov 2025 13:00:21 +0000 Subject: [PATCH 2/2] add note on blocking --- src/logfire.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/logfire.rs b/src/logfire.rs index 671ac7d..d36da3b 100644 --- a/src/logfire.rs +++ b/src/logfire.rs @@ -97,6 +97,10 @@ impl Logfire { /// Forcibly flush the current data captured by Logfire. /// + /// Note: this will block until data is flushed. If called from an async context, + /// consider using `tokio::task::spawn_blocking` or similar to avoid blocking the + /// async runtime. + /// /// # Errors /// /// This will error if the underlying OpenTelemetry SDK fails to flush data.