diff --git a/sqlx-core/src/logger.rs b/sqlx-core/src/logger.rs index 0cf094644b..41413d1483 100644 --- a/sqlx-core/src/logger.rs +++ b/sqlx-core/src/logger.rs @@ -77,6 +77,17 @@ pub struct QueryLogger { impl QueryLogger { pub fn new(sql: SqlStr, settings: LogSettings) -> Self { + Self::new_under_span(sql, settings, None) + } + + /// Like [`new`](Self::new) but creates the query span as a child of + /// `parent_span` instead of [`tracing::Span::current`]. + /// + /// Backends use this to attribute queries running inside a transaction to that + /// transaction's span — the parent comes from a per-connection tx span stack + /// maintained by [`TransactionManager`](crate::transaction::TransactionManager). + /// Passing `None` falls back to the contextual current span, matching `new`. + pub fn new_under_span(sql: SqlStr, settings: LogSettings, parent_span: Option) -> Self { // Hardcoded INFO level per maintainer review of #3313: libraries should pick a // level and let consumers filter via `EnvFilter`. Field names follow the OTel // database span semantic conventions @@ -85,6 +96,7 @@ impl QueryLogger { // to set the exported `SpanKind`. `db.system.name` is declared empty here and // filled in by drivers via `with_db_system_name`, so adding the field doesn't // force a signature break on `QueryLogger::new`. + let parent = parent_span.unwrap_or_else(Span::current); let summary = parse_query_summary(sql.as_str()); let operation = summary .split_whitespace() @@ -93,6 +105,7 @@ impl QueryLogger { .unwrap_or_default(); let span = tracing::info_span!( target: "sqlx::query", + parent: &parent, "db.query", "db.system.name" = tracing::field::Empty, "db.operation.name" = operation, diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 46f89d25b2..dd9994d1e8 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -47,6 +47,59 @@ pub trait TransactionManager { /// - Level 1: A transaction is active. /// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it. fn get_transaction_depth(conn: &::Connection) -> usize; + + /// Records the outermost transaction's tracing span on the connection, + /// together with the caller's `Span::current()` at begin time. Called by + /// [`Transaction`] only when opening the outermost transaction (not for + /// nested savepoints, which share the outer span). + /// + /// The `parent_at_begin` id drives the user-injected-span heuristic: if + /// `Span::current()` at query time still matches it, the caller hasn't + /// entered their own span between begin and the query, so we auto-parent + /// under the tx span. If they differ, we respect the caller's current. + /// + /// Default impl is a no-op. In-tree backends override. + fn set_transaction_span( + _conn: &mut ::Connection, + _span: tracing::Span, + _parent_at_begin: Option, + ) { + } + + /// Clears the connection's transaction span. Called by [`Transaction`] + /// only when the outermost transaction commits, rolls back, or is dropped. + /// + /// Default impl is a no-op; in-tree backends clear their `Option`. + fn clear_transaction_span(_conn: &mut ::Connection) {} + + /// Returns the connection's currently open transaction span, if any. + /// Used by [`Transaction::begin`] so savepoint transactions can share the + /// outermost transaction's span. + /// + /// Default impl returns `None`; in-tree backends look at their `Option`. + fn current_transaction_span( + _conn: &::Connection, + ) -> Option { + None + } + + /// Returns the span the executor should use as the parent for the next + /// query span on this connection, or `None` to fall back to + /// `Span::current()`. + /// + /// In-tree backends compare `Span::current()` to the stored + /// `parent_at_begin`: if equal, the caller hasn't entered any span since + /// `begin`, so the query parents under the tx span; if different, the + /// caller has wrapped this query in their own span, so we respect that + /// and return `None`. + /// + /// Default impl returns `None`, preserving pre-tracing behavior for + /// out-of-tree drivers. + fn query_parent_span( + _conn: &::Connection, + ) -> Option { + None + } } /// An in-progress database transaction or savepoint. @@ -89,6 +142,12 @@ where { connection: MaybePoolConnection<'c, DB>, open: bool, + span: tracing::Span, + /// True for the outermost transaction, false for nested savepoints. The + /// outermost owns the span: it sets it on `begin`, records the outcome, + /// and clears it on commit/rollback/drop. Savepoints share the same span + /// and don't touch the connection's tracing state. + is_outermost: bool, } impl<'c, DB> Transaction<'c, DB> @@ -103,23 +162,93 @@ where let conn = conn.into(); Box::pin(async move { + // Hardcoded INFO level per maintainer review of #3313. Field names follow the + // OTel database span semantic conventions. `db.transaction.outcome` starts as + // `Empty` and gets recorded on commit/rollback/drop so the span carries its + // resolution. + // + // The outermost transaction owns the span; nested savepoints share it, so a + // sequence of BEGIN / SAVEPOINT / queries / RELEASE / COMMIT shows up as a + // single `db.transaction` span with the per-statement query spans as + // children. Per-savepoint duration isn't surfaced — the SAVEPOINT / RELEASE + // query spans themselves serve as the markers. + let depth_before = DB::TransactionManager::get_transaction_depth(&conn); + let is_outermost = depth_before == 0; + let span = if is_outermost { + tracing::info_span!( + target: "sqlx::transaction", + parent: &tracing::Span::current(), + "db.transaction", + "db.system.name" = %DB::NAME.to_ascii_lowercase(), + "db.operation.name" = "BEGIN", + "db.transaction.outcome" = tracing::field::Empty, + "otel.kind" = "client", + ) + } else { + // Reuse the outermost transaction's span so nested savepoint queries + // still parent under it. + DB::TransactionManager::current_transaction_span(&conn).unwrap_or_else( + // Defensive: out-of-tree drivers that don't override the trait + // methods can land here. Fall back to a fresh detached span — no + // worse than the pre-tracing-spans behavior. + tracing::Span::current, + ) + }; + let mut tx = Self { connection: conn, // If the call to `begin` fails or doesn't complete we want to attempt a rollback in case the transaction was started. open: true, + span: span.clone(), + is_outermost, }; + // Only the outermost transaction installs the span on the connection; + // savepoints inherit it. Set before BEGIN so the BEGIN's query span + // parents under it via the executor's `query_parent_span` lookup. If + // BEGIN fails, the Drop handler clears it back out (`open: true`). + // + // `parent_at_begin` lets later `query_parent_span` calls distinguish + // "caller is still at the same level as begin" (auto-parent) from + // "caller has entered their own span in between" (respect their + // current). + if is_outermost { + let parent_at_begin = tracing::Span::current().id(); + DB::TransactionManager::set_transaction_span( + &mut tx.connection, + span, + parent_at_begin, + ); + } + DB::TransactionManager::begin(&mut tx.connection, statement).await?; Ok(tx) }) } + /// Returns a handle to the transaction's tracing span. + /// + /// Useful when callers want to manually parent their own spans under the + /// transaction — e.g. wrapping a block of procedural work in + /// `something.instrument(tx.span())` so user spans inside become children + /// of the transaction. The returned `Span` is cheap to clone and `Send`. + pub fn span(&self) -> tracing::Span { + self.span.clone() + } + /// Commits this transaction or savepoint. pub async fn commit(mut self) -> Result<(), Error> { + // The span stays on the connection across the COMMIT call so the executor + // parents the COMMIT's query span under it; cleared only after success, and + // only for the outermost transaction (savepoints don't own the span). DB::TransactionManager::commit(&mut self.connection).await?; self.open = false; + if self.is_outermost { + self.span.record("db.transaction.outcome", "committed"); + DB::TransactionManager::clear_transaction_span(&mut self.connection); + } Ok(()) } @@ -128,6 +257,10 @@ where pub async fn rollback(mut self) -> Result<(), Error> { DB::TransactionManager::rollback(&mut self.connection).await?; self.open = false; + if self.is_outermost { + self.span.record("db.transaction.outcome", "rolled_back"); + DB::TransactionManager::clear_transaction_span(&mut self.connection); + } Ok(()) } @@ -275,6 +408,14 @@ where // connection (including if the connection is returned to a pool) DB::TransactionManager::start_rollback(&mut self.connection); + + // Only the outermost transaction owns the span and the connection's tracing + // state. Clear after start_rollback so the span is still installed for the + // duration of the operation, mirroring commit/rollback. + if self.is_outermost { + self.span.record("db.transaction.outcome", "dropped"); + DB::TransactionManager::clear_transaction_span(&mut self.connection); + } } } } diff --git a/sqlx-mysql/src/connection/establish.rs b/sqlx-mysql/src/connection/establish.rs index f61654d876..37905400df 100644 --- a/sqlx-mysql/src/connection/establish.rs +++ b/sqlx-mysql/src/connection/establish.rs @@ -26,6 +26,7 @@ impl MySqlConnection { inner: Box::new(MySqlConnectionInner { stream, transaction_depth: 0, + transaction_span: None, status_flags: Default::default(), cache_statement: StatementCache::new(options.statement_cache_capacity), log_settings: options.log_settings.clone(), diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index 2234e6aff0..eb95f94a50 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -11,10 +11,11 @@ use crate::protocol::statement::{ }; use crate::protocol::text::{ColumnDefinition, Query, TextRow}; use crate::statement::{MySqlStatement, MySqlStatementMetadata}; +use crate::transaction::TransactionManager; use crate::HashMap; use crate::{ - MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo, - MySqlValueFormat, + MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, + MySqlTransactionManager, MySqlTypeInfo, MySqlValueFormat, }; use either::Either; use futures_core::future::BoxFuture; @@ -107,8 +108,10 @@ impl MySqlConnection { persistent: bool, ) -> Result, Error>> + 'e, Error> { + let parent_span = MySqlTransactionManager::query_parent_span(self); let mut logger = - QueryLogger::new(sql, self.inner.log_settings.clone()).with_db_system_name("mysql"); + QueryLogger::new_under_span(sql, self.inner.log_settings.clone(), parent_span) + .with_db_system_name("mysql"); let span = logger.span(); self.inner.stream.wait_until_ready().await?; diff --git a/sqlx-mysql/src/connection/mod.rs b/sqlx-mysql/src/connection/mod.rs index 569ad32722..4262400bef 100644 --- a/sqlx-mysql/src/connection/mod.rs +++ b/sqlx-mysql/src/connection/mod.rs @@ -43,6 +43,7 @@ pub(crate) struct MySqlConnectionInner { // transaction status pub(crate) transaction_depth: usize, + pub(crate) transaction_span: Option<(tracing::Span, Option)>, status_flags: Status, // cache by query string to the statement id and metadata diff --git a/sqlx-mysql/src/transaction.rs b/sqlx-mysql/src/transaction.rs index 18db30b183..3dd3f1332f 100644 --- a/sqlx-mysql/src/transaction.rs +++ b/sqlx-mysql/src/transaction.rs @@ -73,4 +73,32 @@ impl TransactionManager for MySqlTransactionManager { fn get_transaction_depth(conn: &MySqlConnection) -> usize { conn.inner.transaction_depth } + + fn set_transaction_span( + conn: &mut MySqlConnection, + span: tracing::Span, + parent_at_begin: Option, + ) { + conn.inner.transaction_span = Some((span, parent_at_begin)); + } + + fn clear_transaction_span(conn: &mut MySqlConnection) { + conn.inner.transaction_span = None; + } + + fn current_transaction_span(conn: &MySqlConnection) -> Option { + conn.inner + .transaction_span + .as_ref() + .map(|(span, _)| span.clone()) + } + + fn query_parent_span(conn: &MySqlConnection) -> Option { + let (tx_span, parent_at_begin) = conn.inner.transaction_span.as_ref()?; + if tracing::Span::current().id() == *parent_at_begin { + Some(tx_span.clone()) + } else { + None + } + } } diff --git a/sqlx-postgres/src/connection/establish.rs b/sqlx-postgres/src/connection/establish.rs index 3c2f516533..479e7d1dbe 100644 --- a/sqlx-postgres/src/connection/establish.rs +++ b/sqlx-postgres/src/connection/establish.rs @@ -142,6 +142,7 @@ impl PgConnection { secret_key, transaction_status, transaction_depth: 0, + transaction_span: None, pending_ready_for_query_count: 0, next_statement_id: StatementId::NAMED_START, cache_statement: StatementCache::new(options.statement_cache_capacity), diff --git a/sqlx-postgres/src/connection/executor.rs b/sqlx-postgres/src/connection/executor.rs index 8c0170bbb9..589596754b 100644 --- a/sqlx-postgres/src/connection/executor.rs +++ b/sqlx-postgres/src/connection/executor.rs @@ -7,9 +7,10 @@ use crate::message::{ ParseComplete, RowDescription, }; use crate::statement::PgStatementMetadata; +use crate::transaction::TransactionManager; use crate::{ - statement::PgStatement, PgArguments, PgConnection, PgQueryResult, PgRow, PgTypeInfo, - PgValueFormat, Postgres, + statement::PgStatement, PgArguments, PgConnection, PgQueryResult, PgRow, PgTransactionManager, + PgTypeInfo, PgValueFormat, Postgres, }; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; @@ -203,8 +204,10 @@ impl PgConnection { persistent: bool, metadata_opt: Option>, ) -> Result, Error>> + 'e, Error> { - let mut logger = QueryLogger::new(query, self.inner.log_settings.clone()) - .with_db_system_name("postgresql"); + let parent_span = PgTransactionManager::query_parent_span(self); + let mut logger = + QueryLogger::new_under_span(query, self.inner.log_settings.clone(), parent_span) + .with_db_system_name("postgresql"); let span = logger.span(); let sql = logger.sql().as_str(); diff --git a/sqlx-postgres/src/connection/mod.rs b/sqlx-postgres/src/connection/mod.rs index d594585b6c..8928a50786 100644 --- a/sqlx-postgres/src/connection/mod.rs +++ b/sqlx-postgres/src/connection/mod.rs @@ -74,6 +74,7 @@ pub struct PgConnectionInner { // current transaction status transaction_status: TransactionStatus, pub(crate) transaction_depth: usize, + pub(crate) transaction_span: Option<(tracing::Span, Option)>, log_settings: LogSettings, } diff --git a/sqlx-postgres/src/transaction.rs b/sqlx-postgres/src/transaction.rs index 3f4122ea82..f341441624 100644 --- a/sqlx-postgres/src/transaction.rs +++ b/sqlx-postgres/src/transaction.rs @@ -73,6 +73,34 @@ impl TransactionManager for PgTransactionManager { fn get_transaction_depth(conn: &::Connection) -> usize { conn.inner.transaction_depth } + + fn set_transaction_span( + conn: &mut PgConnection, + span: tracing::Span, + parent_at_begin: Option, + ) { + conn.inner.transaction_span = Some((span, parent_at_begin)); + } + + fn clear_transaction_span(conn: &mut PgConnection) { + conn.inner.transaction_span = None; + } + + fn current_transaction_span(conn: &PgConnection) -> Option { + conn.inner + .transaction_span + .as_ref() + .map(|(span, _)| span.clone()) + } + + fn query_parent_span(conn: &PgConnection) -> Option { + let (tx_span, parent_at_begin) = conn.inner.transaction_span.as_ref()?; + if tracing::Span::current().id() == *parent_at_begin { + Some(tx_span.clone()) + } else { + None + } + } } struct Rollback<'c> { diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index b3a5af5543..a453eea56b 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -88,9 +88,17 @@ impl AnyConnectionBackend for SqliteConnection { let persistent = persistent && arguments.is_some(); let args = arguments.map(map_arguments); + let parent_span = SqliteTransactionManager::query_parent_span(self); Box::pin( self.worker - .execute(query, args, self.row_channel_size, persistent, None) + .execute( + query, + args, + self.row_channel_size, + persistent, + None, + parent_span, + ) .map_ok(flume::Receiver::into_stream) .try_flatten_stream() .map( @@ -111,10 +119,18 @@ impl AnyConnectionBackend for SqliteConnection { let persistent = persistent && arguments.is_some(); let args = arguments.map(map_arguments); + let parent_span = SqliteTransactionManager::query_parent_span(self); Box::pin(async move { let mut stream = pin!( self.worker - .execute(query, args, self.row_channel_size, persistent, Some(1)) + .execute( + query, + args, + self.row_channel_size, + persistent, + Some(1), + parent_span, + ) .map_ok(flume::Receiver::into_stream) .await? ); diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index a62d3349a5..f7c3ece337 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -1,5 +1,6 @@ use crate::{ - Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo, + Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, + SqliteTransactionManager, SqliteTypeInfo, }; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; @@ -7,6 +8,7 @@ use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use sqlx_core::error::Error; use sqlx_core::executor::{Execute, Executor}; use sqlx_core::sql_str::SqlStr; +use sqlx_core::transaction::TransactionManager; use sqlx_core::Either; use std::{future, pin::pin}; @@ -29,10 +31,18 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { }; let persistent = query.persistent() && arguments.is_some(); let sql = query.sql(); + let parent_span = SqliteTransactionManager::query_parent_span(self); Box::pin( self.worker - .execute(sql, arguments, self.row_channel_size, persistent, None) + .execute( + sql, + arguments, + self.row_channel_size, + persistent, + None, + parent_span, + ) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(), ) @@ -54,11 +64,19 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { }; let persistent = query.persistent() && arguments.is_some(); + let parent_span = SqliteTransactionManager::query_parent_span(self); Box::pin(async move { let sql = query.sql(); let mut stream = pin!(self .worker - .execute(sql, arguments, self.row_channel_size, persistent, Some(1)) + .execute( + sql, + arguments, + self.row_channel_size, + persistent, + Some(1), + parent_span, + ) .map_ok(flume::Receiver::into_stream) .try_flatten_stream()); diff --git a/sqlx-sqlite/src/connection/mod.rs b/sqlx-sqlite/src/connection/mod.rs index 218c747143..30eaa9b070 100644 --- a/sqlx-sqlite/src/connection/mod.rs +++ b/sqlx-sqlite/src/connection/mod.rs @@ -60,6 +60,10 @@ pub struct SqliteConnection { optimize_on_close: OptimizeOnClose, pub(crate) worker: ConnectionWorker, pub(crate) row_channel_size: usize, + /// The outermost open transaction's span and the caller's `Span::current()` + /// id at begin time. Set when a top-level transaction begins, cleared when + /// it commits/rolls back; nested savepoints share this span. + pub(crate) transaction_span: Option<(tracing::Span, Option)>, } pub struct LockedSqliteHandle<'a> { @@ -189,6 +193,7 @@ impl SqliteConnection { optimize_on_close: options.optimize_on_close.clone(), worker, row_channel_size: options.row_channel_size, + transaction_span: None, }) } diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 2585dc1312..f98c021761 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -368,9 +368,15 @@ impl ConnectionWorker { chan_size: usize, persistent: bool, limit: Option, + parent_span: Option, ) -> Result, Error>>, Error> { let (tx, rx) = flume::bounded(chan_size); + // The worker thread enters this span before running the query, so the + // `QueryLogger` span it creates picks `parent_span` (or the contextual + // current span if none was supplied) as its parent. + let span = parent_span.unwrap_or_else(Span::current); + self.command_tx .send_async(( Command::Execute { @@ -380,7 +386,7 @@ impl ConnectionWorker { tx, limit, }, - Span::current(), + span, )) .await .map_err(|_| Error::WorkerCrashed)?; diff --git a/sqlx-sqlite/src/transaction.rs b/sqlx-sqlite/src/transaction.rs index 1c57d0163f..c5c2c696a1 100644 --- a/sqlx-sqlite/src/transaction.rs +++ b/sqlx-sqlite/src/transaction.rs @@ -32,4 +32,29 @@ impl TransactionManager for SqliteTransactionManager { fn get_transaction_depth(conn: &SqliteConnection) -> usize { conn.worker.shared.get_transaction_depth() } + + fn set_transaction_span( + conn: &mut SqliteConnection, + span: tracing::Span, + parent_at_begin: Option, + ) { + conn.transaction_span = Some((span, parent_at_begin)); + } + + fn clear_transaction_span(conn: &mut SqliteConnection) { + conn.transaction_span = None; + } + + fn current_transaction_span(conn: &SqliteConnection) -> Option { + conn.transaction_span.as_ref().map(|(span, _)| span.clone()) + } + + fn query_parent_span(conn: &SqliteConnection) -> Option { + let (tx_span, parent_at_begin) = conn.transaction_span.as_ref()?; + if tracing::Span::current().id() == *parent_at_begin { + Some(tx_span.clone()) + } else { + None + } + } }