From e28e0b7b8dc61227feb009bf7d7a1974e56d615c Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 20 Oct 2021 22:21:40 -0400 Subject: [PATCH 1/2] Add a traceable Diesel connection structure --- Cargo.lock | 6 +- nexus/Cargo.toml | 4 +- nexus/src/db/datastore.rs | 3 +- nexus/src/db/mod.rs | 2 + nexus/src/db/pool.rs | 10 ++-- nexus/src/db/traced_pg_connection.rs | 89 ++++++++++++++++++++++++++++ nexus/src/db/update_and_check.rs | 32 ++++++---- 7 files changed, 124 insertions(+), 22 deletions(-) create mode 100644 nexus/src/db/traced_pg_connection.rs diff --git a/Cargo.lock b/Cargo.lock index 493e4a306f5..4d89f84b62c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ dependencies = [ [[package]] name = "async-bb8-diesel" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/async-bb8-diesel?rev=1a5c55d#1a5c55d9e31210fd7dfff944798e89578e3a0dfc" +source = "git+https://github.com/oxidecomputer/async-bb8-diesel?rev=22c26ef#22c26ef840075a57b18ac2eae3ead989b992773e" dependencies = [ "async-trait", "bb8", @@ -358,7 +358,7 @@ dependencies = [ [[package]] name = "diesel" version = "2.0.0" -source = "git+https://github.com/diesel-rs/diesel?rev=a39dd2e#a39dd2ebc5acccb8cde73d40b0d81dde3e073170" +source = "git+https://github.com/diesel-rs/diesel?rev=ce77c382#ce77c382d2836f6b385225991cf58cb2d2dd65d6" dependencies = [ "bitflags", "byteorder", @@ -376,7 +376,7 @@ dependencies = [ [[package]] name = "diesel_derives" version = "2.0.0" -source = "git+https://github.com/diesel-rs/diesel?rev=a39dd2e#a39dd2ebc5acccb8cde73d40b0d81dde3e073170" +source = "git+https://github.com/diesel-rs/diesel?rev=ce77c382#ce77c382d2836f6b385225991cf58cb2d2dd65d6" dependencies = [ "proc-macro2", "quote", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 7ebc2cce490..8474fc38877 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -10,9 +10,9 @@ path = "../rpaths" anyhow = "1.0" async-trait = "0.1.51" bb8 = "0.7.1" -async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "1a5c55d" } +async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "22c26ef" } # Tracking pending 2.0 version. -diesel = { git = "https://github.com/diesel-rs/diesel", rev = "a39dd2e", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] } +diesel = { git = "https://github.com/diesel-rs/diesel", rev = "ce77c382", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] } futures = "0.3.15" http = "0.2.5" hyper = "0.14" diff --git a/nexus/src/db/datastore.rs b/nexus/src/db/datastore.rs index c9cfc0e47d3..a2eddd805c0 100644 --- a/nexus/src/db/datastore.rs +++ b/nexus/src/db/datastore.rs @@ -20,6 +20,7 @@ use super::identity::{Asset, Resource}; use super::Pool; +use super::TracedPgConnection; use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager}; use chrono::Utc; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; @@ -62,7 +63,7 @@ impl DataStore { DataStore { pool } } - fn pool(&self) -> &bb8::Pool> { + fn pool(&self) -> &bb8::Pool> { self.pool.pool() } diff --git a/nexus/src/db/mod.rs b/nexus/src/db/mod.rs index 283c4f00dbb..94cedda6496 100644 --- a/nexus/src/db/mod.rs +++ b/nexus/src/db/mod.rs @@ -10,6 +10,7 @@ mod pool; mod saga_recovery; mod saga_types; mod sec_store; +mod traced_pg_connection; mod update_and_check; #[cfg(test)] @@ -25,3 +26,4 @@ pub use pool::Pool; pub use saga_recovery::{recover, RecoveryTask}; pub use saga_types::SecId; pub use sec_store::CockroachDbSecStore; +pub(crate) use traced_pg_connection::TracedPgConnection; diff --git a/nexus/src/db/pool.rs b/nexus/src/db/pool.rs index c53f74ef574..eece39cb613 100644 --- a/nexus/src/db/pool.rs +++ b/nexus/src/db/pool.rs @@ -25,26 +25,28 @@ */ use super::Config as DbConfig; +use super::TracedPgConnection; use async_bb8_diesel::ConnectionManager; -use diesel::PgConnection; /// Wrapper around a database connection pool. /// /// Expected to be used as the primary interface to the database. pub struct Pool { - pool: bb8::Pool>, + pool: bb8::Pool>, } impl Pool { pub fn new(db_config: &DbConfig) -> Self { let manager = - ConnectionManager::::new(&db_config.url.url()); + ConnectionManager::::new(&db_config.url.url()); let pool = bb8::Builder::new().build_unchecked(manager); Pool { pool } } /// Returns a reference to the underlying pool. - pub fn pool(&self) -> &bb8::Pool> { + pub(crate) fn pool( + &self, + ) -> &bb8::Pool> { &self.pool } } diff --git a/nexus/src/db/traced_pg_connection.rs b/nexus/src/db/traced_pg_connection.rs new file mode 100644 index 00000000000..87308316ab2 --- /dev/null +++ b/nexus/src/db/traced_pg_connection.rs @@ -0,0 +1,89 @@ +//! Wrapper around [`diesel::PgConnection`] which adds tracing. + +use diesel::connection::{ + AnsiTransactionManager, ConnectionGatWorkaround, SimpleConnection, +}; +use diesel::expression::QueryMetadata; +use diesel::pg::{GetPgMetadataCache, Pg, PgMetadataCache}; +use diesel::prelude::*; +use diesel::query_builder::{AsQuery, QueryFragment, QueryId}; +use diesel::PgConnection; + +/// A wrapper around [`diesel::PgConnection`] enabling tracing. +/// +/// This struct attempts to mimic nearly the entire public API +/// of PgConnection - and should be usable in the same spots - +/// but provides hooks for tracing and logging functions to inspect +/// operations as they are issued to the database. +/// +/// NOTE: This function effectively relies on implementation details +/// of Diesel, and as such, is highly unstable. Modifications beyond +/// "changes for instrumentation and observability" are not recommended. +pub(crate) struct TracedPgConnection(PgConnection); + +impl SimpleConnection for TracedPgConnection { + fn batch_execute(&mut self, query: &str) -> QueryResult<()> { + // TODO: We should trace this. We have a query string here. + self.0.batch_execute(query) + } +} + +impl<'a> ConnectionGatWorkaround<'a, Pg> for TracedPgConnection { + type Cursor = >::Cursor; + type Row = >::Row; +} + +impl Connection for TracedPgConnection { + type Backend = Pg; + type TransactionManager = AnsiTransactionManager; + + fn establish(database_url: &str) -> ConnectionResult { + Ok(TracedPgConnection(PgConnection::establish(database_url)?)) + } + + fn execute(&mut self, query: &str) -> QueryResult { + // TODO: We should trace this. We have a query string here. + self.0.execute(query) + } + + fn load( + &mut self, + source: T, + ) -> QueryResult<>::Cursor> + where + T: AsQuery, + T::Query: QueryFragment + QueryId, + Self::Backend: QueryMetadata, + { + // TODO: This is also worth tracing - it appears to issue a call to the + // underlying DB using the "raw connection" - so it doesn't call + // 'execute'. + self.0.load(source) + } + + fn execute_returning_count(&mut self, source: &T) -> QueryResult + where + T: QueryFragment + QueryId, + { + self.0.execute_returning_count(source) + } + + fn transaction_state(&mut self) -> &mut AnsiTransactionManager + where + Self: Sized, + { + self.0.transaction_state() + } +} + +impl GetPgMetadataCache for TracedPgConnection { + fn get_metadata_cache(&mut self) -> &mut PgMetadataCache { + self.0.get_metadata_cache() + } +} + +impl diesel::r2d2::R2D2Connection for TracedPgConnection { + fn ping(&mut self) -> QueryResult<()> { + self.0.ping() + } +} diff --git a/nexus/src/db/update_and_check.rs b/nexus/src/db/update_and_check.rs index a22560bdc0f..cac79fcfc7b 100644 --- a/nexus/src/db/update_and_check.rs +++ b/nexus/src/db/update_and_check.rs @@ -40,14 +40,15 @@ impl UpdateStatementExt for UpdateStatement { /// /// US: [`UpdateStatement`] which we are extending. /// K: Primary Key type. -pub trait UpdateAndCheck +pub trait UpdateAndCheck where US: UpdateStatementExt, { /// Nests the existing update statement in a CTE which /// identifies if the row exists (by ID), even if the row /// cannot be successfully updated. - fn check_if_exists(self, key: K) -> UpdateAndQueryStatement; + fn check_if_exists(self, key: K) + -> UpdateAndQueryStatement; } // UpdateStatement has four generic parameters: @@ -63,15 +64,19 @@ where // // This allows our implementation of the CTE to overwrite // the return behavior of the SQL statement. -impl UpdateAndCheck for US +impl UpdateAndCheck for US where US: UpdateStatementExt, { - fn check_if_exists(self, key: K) -> UpdateAndQueryStatement { + fn check_if_exists( + self, + key: K, + ) -> UpdateAndQueryStatement { UpdateAndQueryStatement { update_statement: self.statement(), key, query_type: PhantomData, + connection_type: PhantomData, } } } @@ -80,7 +85,7 @@ where /// with other statements to also SELECT a row. #[derive(Debug, Clone, Copy)] #[must_use = "Queries must be executed"] -pub struct UpdateAndQueryStatement +pub struct UpdateAndQueryStatement where US: UpdateStatementExt, { @@ -88,9 +93,10 @@ where UpdateStatement, key: K, query_type: PhantomData, + connection_type: PhantomData, } -impl QueryId for UpdateAndQueryStatement +impl QueryId for UpdateAndQueryStatement where US: UpdateStatementExt, { @@ -121,7 +127,7 @@ type PrimaryKey = as diesel::Table>::PrimaryKey; // Representation of Primary Key in SQL. type SerializedPrimaryKey = as diesel::Expression>::SqlType; -impl UpdateAndQueryStatement +impl UpdateAndQueryStatement where US: 'static + UpdateStatementExt, K: 'static + PartialEq + Send, @@ -129,6 +135,7 @@ where US::WhereClause: 'static + Send, US::Changeset: 'static + Send, Q: std::fmt::Debug + Send + 'static, + C: diesel::r2d2::R2D2Connection + 'static, { /// Issues the CTE and parses the result. /// @@ -138,11 +145,12 @@ where /// - Error (row doesn't exist, or other diesel error) pub async fn execute_and_check( self, - pool: &bb8::Pool>, + pool: &bb8::Pool>, ) -> Result, PoolError> where // We require this bound to ensure that "Self" is runnable as query. - Self: LoadQuery, Option, Q)>, + Self: LoadQuery, Option, Q)>, + C: Connection, { let (id0, id1, found) = self.get_result_async::<(Option, Option, Q)>(pool).await?; @@ -159,7 +167,7 @@ where type SelectableSqlType = <>::SelectExpression as Expression>::SqlType; -impl Query for UpdateAndQueryStatement +impl Query for UpdateAndQueryStatement where US: UpdateStatementExt, US::Table: Table, @@ -172,7 +180,7 @@ where ); } -impl RunQueryDsl for UpdateAndQueryStatement +impl RunQueryDsl for UpdateAndQueryStatement where US: UpdateStatementExt, US::Table: Table, @@ -195,7 +203,7 @@ where /// // ON /// // found. = updated.; /// ``` -impl QueryFragment for UpdateAndQueryStatement +impl QueryFragment for UpdateAndQueryStatement where US: UpdateStatementExt, US::Table: HasTable From 73585d3a1e55252023f936445d5bec2e4a3ca715 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 20 Oct 2021 22:39:41 -0400 Subject: [PATCH 2/2] Add tracing that we probably don't want to use, but which could become probes --- nexus/src/db/traced_pg_connection.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nexus/src/db/traced_pg_connection.rs b/nexus/src/db/traced_pg_connection.rs index 87308316ab2..9219cf1d78b 100644 --- a/nexus/src/db/traced_pg_connection.rs +++ b/nexus/src/db/traced_pg_connection.rs @@ -24,6 +24,7 @@ pub(crate) struct TracedPgConnection(PgConnection); impl SimpleConnection for TracedPgConnection { fn batch_execute(&mut self, query: &str) -> QueryResult<()> { // TODO: We should trace this. We have a query string here. + println!("batch_execute: {}", query); self.0.batch_execute(query) } } @@ -43,6 +44,7 @@ impl Connection for TracedPgConnection { fn execute(&mut self, query: &str) -> QueryResult { // TODO: We should trace this. We have a query string here. + println!("execute: {}", query); self.0.execute(query) } @@ -58,7 +60,9 @@ impl Connection for TracedPgConnection { // TODO: This is also worth tracing - it appears to issue a call to the // underlying DB using the "raw connection" - so it doesn't call // 'execute'. - self.0.load(source) + let query = source.as_query(); + println!("load: {}", diesel::debug_query::(&query)); + self.0.load(query) } fn execute_returning_count(&mut self, source: &T) -> QueryResult