Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sqlx-core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ pub struct QueryLogger {

impl QueryLogger {
pub fn new(sql: SqlStr, settings: LogSettings) -> Self {
Self::new_under_span(sql, settings, None)
}

/// Like [`new`](Self::new) but creates the query span as a child of
/// `parent_span` instead of [`tracing::Span::current`].
///
/// Backends use this to attribute queries running inside a transaction to that
/// transaction's span — the parent comes from a per-connection tx span stack
/// maintained by [`TransactionManager`](crate::transaction::TransactionManager).
/// Passing `None` falls back to the contextual current span, matching `new`.
pub fn new_under_span(sql: SqlStr, settings: LogSettings, parent_span: Option<Span>) -> Self {
// Hardcoded INFO level per maintainer review of #3313: libraries should pick a
// level and let consumers filter via `EnvFilter`. Field names follow the OTel
// database span semantic conventions
Expand All @@ -85,6 +96,7 @@ impl QueryLogger {
// to set the exported `SpanKind`. `db.system.name` is declared empty here and
// filled in by drivers via `with_db_system_name`, so adding the field doesn't
// force a signature break on `QueryLogger::new`.
let parent = parent_span.unwrap_or_else(Span::current);
let summary = parse_query_summary(sql.as_str());
let operation = summary
.split_whitespace()
Expand All @@ -93,6 +105,7 @@ impl QueryLogger {
.unwrap_or_default();
let span = tracing::info_span!(
target: "sqlx::query",
parent: &parent,
"db.query",
"db.system.name" = tracing::field::Empty,
"db.operation.name" = operation,
Expand Down
141 changes: 141 additions & 0 deletions sqlx-core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,59 @@ pub trait TransactionManager {
/// - Level 1: A transaction is active.
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;

/// Records the outermost transaction's tracing span on the connection,
/// together with the caller's `Span::current()` at begin time. Called by
/// [`Transaction`] only when opening the outermost transaction (not for
/// nested savepoints, which share the outer span).
///
/// The `parent_at_begin` id drives the user-injected-span heuristic: if
/// `Span::current()` at query time still matches it, the caller hasn't
/// entered their own span between begin and the query, so we auto-parent
/// under the tx span. If they differ, we respect the caller's current.
///
/// Default impl is a no-op. In-tree backends override.
fn set_transaction_span(
_conn: &mut <Self::Database as Database>::Connection,
_span: tracing::Span,
_parent_at_begin: Option<tracing::Id>,
) {
}

/// Clears the connection's transaction span. Called by [`Transaction`]
/// only when the outermost transaction commits, rolls back, or is dropped.
///
/// Default impl is a no-op; in-tree backends clear their `Option`.
fn clear_transaction_span(_conn: &mut <Self::Database as Database>::Connection) {}

/// Returns the connection's currently open transaction span, if any.
/// Used by [`Transaction::begin`] so savepoint transactions can share the
/// outermost transaction's span.
///
/// Default impl returns `None`; in-tree backends look at their `Option`.
fn current_transaction_span(
_conn: &<Self::Database as Database>::Connection,
) -> Option<tracing::Span> {
None
}

/// Returns the span the executor should use as the parent for the next
/// query span on this connection, or `None` to fall back to
/// `Span::current()`.
///
/// In-tree backends compare `Span::current()` to the stored
/// `parent_at_begin`: if equal, the caller hasn't entered any span since
/// `begin`, so the query parents under the tx span; if different, the
/// caller has wrapped this query in their own span, so we respect that
/// and return `None`.
///
/// Default impl returns `None`, preserving pre-tracing behavior for
/// out-of-tree drivers.
fn query_parent_span(
_conn: &<Self::Database as Database>::Connection,
) -> Option<tracing::Span> {
None
}
}

/// An in-progress database transaction or savepoint.
Expand Down Expand Up @@ -89,6 +142,12 @@ where
{
connection: MaybePoolConnection<'c, DB>,
open: bool,
span: tracing::Span,
/// True for the outermost transaction, false for nested savepoints. The
/// outermost owns the span: it sets it on `begin`, records the outcome,
/// and clears it on commit/rollback/drop. Savepoints share the same span
/// and don't touch the connection's tracing state.
is_outermost: bool,
}

impl<'c, DB> Transaction<'c, DB>
Expand All @@ -103,23 +162,93 @@ where
let conn = conn.into();

Box::pin(async move {
// Hardcoded INFO level per maintainer review of #3313. Field names follow the
// OTel database span semantic conventions. `db.transaction.outcome` starts as
// `Empty` and gets recorded on commit/rollback/drop so the span carries its
// resolution.
//
// The outermost transaction owns the span; nested savepoints share it, so a
// sequence of BEGIN / SAVEPOINT / queries / RELEASE / COMMIT shows up as a
// single `db.transaction` span with the per-statement query spans as
// children. Per-savepoint duration isn't surfaced — the SAVEPOINT / RELEASE
// query spans themselves serve as the markers.
let depth_before = DB::TransactionManager::get_transaction_depth(&conn);
let is_outermost = depth_before == 0;
let span = if is_outermost {
tracing::info_span!(
target: "sqlx::transaction",
parent: &tracing::Span::current(),
"db.transaction",
"db.system.name" = %DB::NAME.to_ascii_lowercase(),
"db.operation.name" = "BEGIN",
"db.transaction.outcome" = tracing::field::Empty,
"otel.kind" = "client",
)
} else {
// Reuse the outermost transaction's span so nested savepoint queries
// still parent under it.
DB::TransactionManager::current_transaction_span(&conn).unwrap_or_else(
// Defensive: out-of-tree drivers that don't override the trait
// methods can land here. Fall back to a fresh detached span — no
// worse than the pre-tracing-spans behavior.
tracing::Span::current,
)
};

let mut tx = Self {
connection: conn,

// If the call to `begin` fails or doesn't complete we want to attempt a rollback in case the transaction was started.
open: true,
span: span.clone(),
is_outermost,
};

// Only the outermost transaction installs the span on the connection;
// savepoints inherit it. Set before BEGIN so the BEGIN's query span
// parents under it via the executor's `query_parent_span` lookup. If
// BEGIN fails, the Drop handler clears it back out (`open: true`).
//
// `parent_at_begin` lets later `query_parent_span` calls distinguish
// "caller is still at the same level as begin" (auto-parent) from
// "caller has entered their own span in between" (respect their
// current).
if is_outermost {
let parent_at_begin = tracing::Span::current().id();
DB::TransactionManager::set_transaction_span(
&mut tx.connection,
span,
parent_at_begin,
);
}

DB::TransactionManager::begin(&mut tx.connection, statement).await?;

Ok(tx)
})
}

/// Returns a handle to the transaction's tracing span.
///
/// Useful when callers want to manually parent their own spans under the
/// transaction — e.g. wrapping a block of procedural work in
/// `something.instrument(tx.span())` so user spans inside become children
/// of the transaction. The returned `Span` is cheap to clone and `Send`.
pub fn span(&self) -> tracing::Span {
self.span.clone()
}

/// Commits this transaction or savepoint.
pub async fn commit(mut self) -> Result<(), Error> {
// The span stays on the connection across the COMMIT call so the executor
// parents the COMMIT's query span under it; cleared only after success, and
// only for the outermost transaction (savepoints don't own the span).
DB::TransactionManager::commit(&mut self.connection).await?;
self.open = false;
if self.is_outermost {
self.span.record("db.transaction.outcome", "committed");
DB::TransactionManager::clear_transaction_span(&mut self.connection);
}

Ok(())
}
Expand All @@ -128,6 +257,10 @@ where
pub async fn rollback(mut self) -> Result<(), Error> {
DB::TransactionManager::rollback(&mut self.connection).await?;
self.open = false;
if self.is_outermost {
self.span.record("db.transaction.outcome", "rolled_back");
DB::TransactionManager::clear_transaction_span(&mut self.connection);
}

Ok(())
}
Expand Down Expand Up @@ -275,6 +408,14 @@ where
// connection (including if the connection is returned to a pool)

DB::TransactionManager::start_rollback(&mut self.connection);

// Only the outermost transaction owns the span and the connection's tracing
// state. Clear after start_rollback so the span is still installed for the
// duration of the operation, mirroring commit/rollback.
if self.is_outermost {
self.span.record("db.transaction.outcome", "dropped");
DB::TransactionManager::clear_transaction_span(&mut self.connection);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sqlx-mysql/src/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl MySqlConnection {
inner: Box::new(MySqlConnectionInner {
stream,
transaction_depth: 0,
transaction_span: None,
status_flags: Default::default(),
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
Expand Down
9 changes: 6 additions & 3 deletions sqlx-mysql/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use crate::protocol::statement::{
};
use crate::protocol::text::{ColumnDefinition, Query, TextRow};
use crate::statement::{MySqlStatement, MySqlStatementMetadata};
use crate::transaction::TransactionManager;
use crate::HashMap;
use crate::{
MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
MySqlValueFormat,
MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow,
MySqlTransactionManager, MySqlTypeInfo, MySqlValueFormat,
};
use either::Either;
use futures_core::future::BoxFuture;
Expand Down Expand Up @@ -107,8 +108,10 @@ impl MySqlConnection {
persistent: bool,
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
{
let parent_span = MySqlTransactionManager::query_parent_span(self);
let mut logger =
QueryLogger::new(sql, self.inner.log_settings.clone()).with_db_system_name("mysql");
QueryLogger::new_under_span(sql, self.inner.log_settings.clone(), parent_span)
.with_db_system_name("mysql");
let span = logger.span();

self.inner.stream.wait_until_ready().await?;
Expand Down
1 change: 1 addition & 0 deletions sqlx-mysql/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub(crate) struct MySqlConnectionInner {

// transaction status
pub(crate) transaction_depth: usize,
pub(crate) transaction_span: Option<(tracing::Span, Option<tracing::Id>)>,
status_flags: Status,

// cache by query string to the statement id and metadata
Expand Down
28 changes: 28 additions & 0 deletions sqlx-mysql/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,32 @@ impl TransactionManager for MySqlTransactionManager {
fn get_transaction_depth(conn: &MySqlConnection) -> usize {
conn.inner.transaction_depth
}

fn set_transaction_span(
conn: &mut MySqlConnection,
span: tracing::Span,
parent_at_begin: Option<tracing::Id>,
) {
conn.inner.transaction_span = Some((span, parent_at_begin));
}

fn clear_transaction_span(conn: &mut MySqlConnection) {
conn.inner.transaction_span = None;
}

fn current_transaction_span(conn: &MySqlConnection) -> Option<tracing::Span> {
conn.inner
.transaction_span
.as_ref()
.map(|(span, _)| span.clone())
}

fn query_parent_span(conn: &MySqlConnection) -> Option<tracing::Span> {
let (tx_span, parent_at_begin) = conn.inner.transaction_span.as_ref()?;
if tracing::Span::current().id() == *parent_at_begin {
Some(tx_span.clone())
} else {
None
}
}
}
1 change: 1 addition & 0 deletions sqlx-postgres/src/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl PgConnection {
secret_key,
transaction_status,
transaction_depth: 0,
transaction_span: None,
pending_ready_for_query_count: 0,
next_statement_id: StatementId::NAMED_START,
cache_statement: StatementCache::new(options.statement_cache_capacity),
Expand Down
11 changes: 7 additions & 4 deletions sqlx-postgres/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use crate::message::{
ParseComplete, RowDescription,
};
use crate::statement::PgStatementMetadata;
use crate::transaction::TransactionManager;
use crate::{
statement::PgStatement, PgArguments, PgConnection, PgQueryResult, PgRow, PgTypeInfo,
PgValueFormat, Postgres,
statement::PgStatement, PgArguments, PgConnection, PgQueryResult, PgRow, PgTransactionManager,
PgTypeInfo, PgValueFormat, Postgres,
};
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
Expand Down Expand Up @@ -203,8 +204,10 @@ impl PgConnection {
persistent: bool,
metadata_opt: Option<Arc<PgStatementMetadata>>,
) -> Result<impl Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + 'e, Error> {
let mut logger = QueryLogger::new(query, self.inner.log_settings.clone())
.with_db_system_name("postgresql");
let parent_span = PgTransactionManager::query_parent_span(self);
let mut logger =
QueryLogger::new_under_span(query, self.inner.log_settings.clone(), parent_span)
.with_db_system_name("postgresql");
let span = logger.span();
let sql = logger.sql().as_str();

Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct PgConnectionInner {
// current transaction status
transaction_status: TransactionStatus,
pub(crate) transaction_depth: usize,
pub(crate) transaction_span: Option<(tracing::Span, Option<tracing::Id>)>,

log_settings: LogSettings,
}
Expand Down
28 changes: 28 additions & 0 deletions sqlx-postgres/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,34 @@ impl TransactionManager for PgTransactionManager {
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
conn.inner.transaction_depth
}

fn set_transaction_span(
conn: &mut PgConnection,
span: tracing::Span,
parent_at_begin: Option<tracing::Id>,
) {
conn.inner.transaction_span = Some((span, parent_at_begin));
}

fn clear_transaction_span(conn: &mut PgConnection) {
conn.inner.transaction_span = None;
}

fn current_transaction_span(conn: &PgConnection) -> Option<tracing::Span> {
conn.inner
.transaction_span
.as_ref()
.map(|(span, _)| span.clone())
}

fn query_parent_span(conn: &PgConnection) -> Option<tracing::Span> {
let (tx_span, parent_at_begin) = conn.inner.transaction_span.as_ref()?;
if tracing::Span::current().id() == *parent_at_begin {
Some(tx_span.clone())
} else {
None
}
}
}

struct Rollback<'c> {
Expand Down
Loading