Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ path = "../rpaths"
anyhow = "1.0"
async-trait = "0.1.51"
bb8 = "0.7.1"
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "1a5c55d" }
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "22c26ef" }
# Tracking pending 2.0 version.
diesel = { git = "https://github.com/diesel-rs/diesel", rev = "a39dd2e", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] }
diesel = { git = "https://github.com/diesel-rs/diesel", rev = "ce77c382", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] }
futures = "0.3.15"
http = "0.2.5"
hyper = "0.14"
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/db/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use super::identity::{Asset, Resource};
use super::Pool;
use super::TracedPgConnection;
use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager};
use chrono::Utc;
use diesel::{ExpressionMethods, QueryDsl, SelectableHelper};
Expand Down Expand Up @@ -62,7 +63,7 @@ impl DataStore {
DataStore { pool }
}

fn pool(&self) -> &bb8::Pool<ConnectionManager<diesel::PgConnection>> {
fn pool(&self) -> &bb8::Pool<ConnectionManager<TracedPgConnection>> {
self.pool.pool()
}

Expand Down
2 changes: 2 additions & 0 deletions nexus/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod pool;
mod saga_recovery;
mod saga_types;
mod sec_store;
mod traced_pg_connection;
mod update_and_check;

#[cfg(test)]
Expand All @@ -25,3 +26,4 @@ pub use pool::Pool;
pub use saga_recovery::{recover, RecoveryTask};
pub use saga_types::SecId;
pub use sec_store::CockroachDbSecStore;
pub(crate) use traced_pg_connection::TracedPgConnection;
10 changes: 6 additions & 4 deletions nexus/src/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,28 @@
*/

use super::Config as DbConfig;
use super::TracedPgConnection;
use async_bb8_diesel::ConnectionManager;
use diesel::PgConnection;

/// Wrapper around a database connection pool.
///
/// Expected to be used as the primary interface to the database.
pub struct Pool {
pool: bb8::Pool<ConnectionManager<diesel::PgConnection>>,
pool: bb8::Pool<ConnectionManager<TracedPgConnection>>,
}

impl Pool {
pub fn new(db_config: &DbConfig) -> Self {
let manager =
ConnectionManager::<PgConnection>::new(&db_config.url.url());
ConnectionManager::<TracedPgConnection>::new(&db_config.url.url());
let pool = bb8::Builder::new().build_unchecked(manager);
Pool { pool }
}

/// Returns a reference to the underlying pool.
pub fn pool(&self) -> &bb8::Pool<ConnectionManager<diesel::PgConnection>> {
pub(crate) fn pool(
&self,
) -> &bb8::Pool<ConnectionManager<TracedPgConnection>> {
&self.pool
}
}
93 changes: 93 additions & 0 deletions nexus/src/db/traced_pg_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! Wrapper around [`diesel::PgConnection`] which adds tracing.

use diesel::connection::{
AnsiTransactionManager, ConnectionGatWorkaround, SimpleConnection,
};
use diesel::expression::QueryMetadata;
use diesel::pg::{GetPgMetadataCache, Pg, PgMetadataCache};
use diesel::prelude::*;
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
use diesel::PgConnection;

/// A wrapper around [`diesel::PgConnection`] enabling tracing.
///
/// This struct attempts to mimic nearly the entire public API
/// of PgConnection - and should be usable in the same spots -
/// but provides hooks for tracing and logging functions to inspect
/// operations as they are issued to the database.
///
/// NOTE: This function effectively relies on implementation details
/// of Diesel, and as such, is highly unstable. Modifications beyond
/// "changes for instrumentation and observability" are not recommended.
pub(crate) struct TracedPgConnection(PgConnection);

impl SimpleConnection for TracedPgConnection {
fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
// TODO: We should trace this. We have a query string here.
println!("batch_execute: {}", query);
self.0.batch_execute(query)
}
}

impl<'a> ConnectionGatWorkaround<'a, Pg> for TracedPgConnection {
type Cursor = <PgConnection as ConnectionGatWorkaround<'a, Pg>>::Cursor;
type Row = <PgConnection as ConnectionGatWorkaround<'a, Pg>>::Row;
}

impl Connection for TracedPgConnection {
type Backend = Pg;
type TransactionManager = AnsiTransactionManager;

fn establish(database_url: &str) -> ConnectionResult<TracedPgConnection> {
Ok(TracedPgConnection(PgConnection::establish(database_url)?))
}

fn execute(&mut self, query: &str) -> QueryResult<usize> {
// TODO: We should trace this. We have a query string here.
println!("execute: {}", query);
self.0.execute(query)
}

fn load<T>(
&mut self,
source: T,
) -> QueryResult<<Self as ConnectionGatWorkaround<Pg>>::Cursor>
where
T: AsQuery,
T::Query: QueryFragment<Self::Backend> + QueryId,
Self::Backend: QueryMetadata<T::SqlType>,
{
// TODO: This is also worth tracing - it appears to issue a call to the
// underlying DB using the "raw connection" - so it doesn't call
// 'execute'.
let query = source.as_query();
println!("load: {}", diesel::debug_query::<Self::Backend, _>(&query));
self.0.load(query)
}

fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
where
T: QueryFragment<Pg> + QueryId,
{
self.0.execute_returning_count(source)
}

fn transaction_state(&mut self) -> &mut AnsiTransactionManager
where
Self: Sized,
{
self.0.transaction_state()
}
}

impl GetPgMetadataCache for TracedPgConnection {
fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
self.0.get_metadata_cache()
}
}

impl diesel::r2d2::R2D2Connection for TracedPgConnection {
fn ping(&mut self) -> QueryResult<()> {
self.0.ping()
}
}
32 changes: 20 additions & 12 deletions nexus/src/db/update_and_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ impl<T, U, V> UpdateStatementExt for UpdateStatement<T, U, V> {
///
/// US: [`UpdateStatement`] which we are extending.
/// K: Primary Key type.
pub trait UpdateAndCheck<US, K>
pub trait UpdateAndCheck<US, K, C>
where
US: UpdateStatementExt,
{
/// Nests the existing update statement in a CTE which
/// identifies if the row exists (by ID), even if the row
/// cannot be successfully updated.
fn check_if_exists<Q>(self, key: K) -> UpdateAndQueryStatement<US, K, Q>;
fn check_if_exists<Q>(self, key: K)
-> UpdateAndQueryStatement<US, K, Q, C>;
}

// UpdateStatement has four generic parameters:
Expand All @@ -63,15 +64,19 @@ where
//
// This allows our implementation of the CTE to overwrite
// the return behavior of the SQL statement.
impl<US, K> UpdateAndCheck<US, K> for US
impl<US, K, C> UpdateAndCheck<US, K, C> for US
where
US: UpdateStatementExt,
{
fn check_if_exists<Q>(self, key: K) -> UpdateAndQueryStatement<US, K, Q> {
fn check_if_exists<Q>(
self,
key: K,
) -> UpdateAndQueryStatement<US, K, Q, C> {
UpdateAndQueryStatement {
update_statement: self.statement(),
key,
query_type: PhantomData,
connection_type: PhantomData,
}
}
}
Expand All @@ -80,17 +85,18 @@ where
/// with other statements to also SELECT a row.
#[derive(Debug, Clone, Copy)]
#[must_use = "Queries must be executed"]
pub struct UpdateAndQueryStatement<US, K, Q>
pub struct UpdateAndQueryStatement<US, K, Q, C>
where
US: UpdateStatementExt,
{
update_statement:
UpdateStatement<US::Table, US::WhereClause, US::Changeset>,
key: K,
query_type: PhantomData<Q>,
connection_type: PhantomData<C>,
}

impl<US, K, Q> QueryId for UpdateAndQueryStatement<US, K, Q>
impl<US, K, Q, C> QueryId for UpdateAndQueryStatement<US, K, Q, C>
where
US: UpdateStatementExt,
{
Expand Down Expand Up @@ -121,14 +127,15 @@ type PrimaryKey<US> = <UpdateTable<US> as diesel::Table>::PrimaryKey;
// Representation of Primary Key in SQL.
type SerializedPrimaryKey<US> = <PrimaryKey<US> as diesel::Expression>::SqlType;

impl<US, K, Q> UpdateAndQueryStatement<US, K, Q>
impl<US, K, Q, C> UpdateAndQueryStatement<US, K, Q, C>
where
US: 'static + UpdateStatementExt,
K: 'static + PartialEq + Send,
US::Table: 'static + Table + Send,
US::WhereClause: 'static + Send,
US::Changeset: 'static + Send,
Q: std::fmt::Debug + Send + 'static,
C: diesel::r2d2::R2D2Connection + 'static,
{
/// Issues the CTE and parses the result.
///
Expand All @@ -138,11 +145,12 @@ where
/// - Error (row doesn't exist, or other diesel error)
pub async fn execute_and_check(
self,
pool: &bb8::Pool<ConnectionManager<PgConnection>>,
pool: &bb8::Pool<ConnectionManager<C>>,
) -> Result<UpdateAndQueryResult<Q>, PoolError>
where
// We require this bound to ensure that "Self" is runnable as query.
Self: LoadQuery<PgConnection, (Option<K>, Option<K>, Q)>,
Self: LoadQuery<C, (Option<K>, Option<K>, Q)>,
C: Connection,
{
let (id0, id1, found) =
self.get_result_async::<(Option<K>, Option<K>, Q)>(pool).await?;
Expand All @@ -159,7 +167,7 @@ where
type SelectableSqlType<Q> =
<<Q as diesel::Selectable<Pg>>::SelectExpression as Expression>::SqlType;

impl<US, K, Q> Query for UpdateAndQueryStatement<US, K, Q>
impl<US, K, Q, C> Query for UpdateAndQueryStatement<US, K, Q, C>
where
US: UpdateStatementExt,
US::Table: Table,
Expand All @@ -172,7 +180,7 @@ where
);
}

impl<US, K, Q> RunQueryDsl<PgConnection> for UpdateAndQueryStatement<US, K, Q>
impl<US, K, Q, C> RunQueryDsl<C> for UpdateAndQueryStatement<US, K, Q, C>
where
US: UpdateStatementExt,
US::Table: Table,
Expand All @@ -195,7 +203,7 @@ where
/// // ON
/// // found.<primary_key> = updated.<primary_key>;
/// ```
impl<US, K, Q> QueryFragment<Pg> for UpdateAndQueryStatement<US, K, Q>
impl<US, K, Q, C> QueryFragment<Pg> for UpdateAndQueryStatement<US, K, Q, C>
where
US: UpdateStatementExt,
US::Table: HasTable<Table = US::Table>
Expand Down