Skip to content

Commit

Permalink
feat: cleanup the spanner pool managers
Browse files Browse the repository at this point in the history
- ignore deadpool's Status::available negative values for PoolState
- kill no longer used DB_THREAD_POOL_SIZE

Closes #794
  • Loading branch information
pjenvey committed Aug 24, 2020
1 parent 0a2fecc commit 746f5d1
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 198 deletions.
3 changes: 0 additions & 3 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101;
/// Rough guesstimate of the maximum reasonable life span of a batch
pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds

/// DbPools' worker ThreadPool size
pub const DB_THREAD_POOL_SIZE: usize = 50;

type DbFuture<'a, T> = LocalBoxFuture<'a, Result<T, ApiError>>;

#[async_trait(?Send)]
Expand Down
21 changes: 18 additions & 3 deletions src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use diesel::{
use super::models::{MysqlDb, Result};
#[cfg(test)]
use super::test::TestTransactionCustomizer;
use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS};
use crate::db::{
error::DbError,
results::{self, PoolState},
Db, DbPool, STD_COLLS,
};
use crate::error::{ApiError, ApiResult};
use crate::server::metrics::Metrics;
use crate::settings::Settings;
Expand Down Expand Up @@ -105,8 +109,10 @@ impl DbPool for MysqlDbPool {
}

impl fmt::Debug for MysqlDbPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MysqlDbPool {{ coll_cache: {:?} }}", self.coll_cache)
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MysqlDbPool")
.field("coll_cache", &self.coll_cache)
.finish()
}
}

Expand Down Expand Up @@ -175,3 +181,12 @@ impl Default for CollectionCache {
}
}
}

impl From<diesel::r2d2::State> for PoolState {
fn from(state: diesel::r2d2::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}
27 changes: 0 additions & 27 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,33 +76,6 @@ pub struct PoolState {
pub idle_connections: u32,
}

impl From<diesel::r2d2::State> for PoolState {
fn from(state: diesel::r2d2::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}

impl From<bb8::State> for PoolState {
fn from(state: bb8::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}

impl From<deadpool::Status> for PoolState {
fn from(status: deadpool::Status) -> PoolState {
PoolState {
connections: status.size as u32,
idle_connections: status.available as u32,
}
}
}

#[cfg(test)]
pub type GetCollectionId = i32;

Expand Down
119 changes: 35 additions & 84 deletions src/db/spanner/manager/bb8.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
use std::marker::PhantomData;
use std::{fmt, sync::Arc};

use actix_web::web::block;
use async_trait::async_trait;
use bb8::ManageConnection;
use googleapis_raw::spanner::v1::{
spanner::{CreateSessionRequest, GetSessionRequest, Session},
spanner_grpc::SpannerClient,
};
use grpcio::{
CallOption, ChannelBuilder, ChannelCredentials, EnvBuilder, Environment, MetadataBuilder,
};
use bb8::{ManageConnection, PooledConnection};
use grpcio::{EnvBuilder, Environment};

use crate::{
db::error::{DbError, DbErrorKind},
db::{
error::{DbError, DbErrorKind},
results::PoolState,
},
server::metrics::Metrics,
settings::Settings,
};

pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession};

pub struct SpannerConnectionManager<T> {
#[allow(dead_code)]
pub type Conn<'a> = PooledConnection<'a, SpannerSessionManager<SpannerSession>>;

pub struct SpannerSessionManager<T> {
database_name: String,
/// The gRPC environment
env: Arc<Environment>,
Expand All @@ -29,30 +28,30 @@ pub struct SpannerConnectionManager<T> {
phantom: PhantomData<T>,
}

impl<_T> fmt::Debug for SpannerConnectionManager<_T> {
impl<_T> fmt::Debug for SpannerSessionManager<_T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SpannerConnectionManager")
fmt.debug_struct("bb8::SpannerSessionManager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.finish()
}
}

impl<T> SpannerConnectionManager<T> {
impl<T> SpannerSessionManager<T> {
#[allow(dead_code)]
pub fn new(settings: &Settings, metrics: &Metrics) -> Result<Self, DbError> {
let url = &settings.database_url;
if !url.starts_with("spanner://") {
Err(DbErrorKind::InvalidUrl(url.to_owned()))?;
}
let database_name = url["spanner://".len()..].to_owned();
let database_name = settings
.spanner_database_name()
.ok_or_else(|| DbErrorKind::InvalidUrl(settings.database_url.to_owned()))?
.to_owned();
let env = Arc::new(EnvBuilder::new().build());

#[cfg(not(test))]
let test_transactions = false;
#[cfg(test)]
let test_transactions = settings.database_use_test_transactions;

Ok(SpannerConnectionManager::<T> {
Ok(SpannerSessionManager::<T> {
database_name,
env,
metrics: metrics.clone(),
Expand All @@ -62,67 +61,23 @@ impl<T> SpannerConnectionManager<T> {
}
}

pub struct SpannerSession {
pub client: SpannerClient,
pub session: Session,

pub(in crate::db::spanner) use_test_transactions: bool,
}

#[async_trait]
impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
for SpannerConnectionManager<T>
{
impl<T: Send + Sync + 'static> ManageConnection for SpannerSessionManager<T> {
type Connection = SpannerSession;
type Error = DbError;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let env = self.env.clone();
let mut metrics = self.metrics.clone();
// XXX: issue732: Could google_default_credentials (or
// ChannelBuilder::secure_connect) block?!
let chan = block(move || -> Result<grpcio::Channel, grpcio::Error> {
metrics.start_timer("storage.pool.grpc_auth", None);
// Requires
// GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
let creds = ChannelCredentials::google_default_credentials()?;
Ok(ChannelBuilder::new(env)
.max_send_message_len(100 << 20)
.max_receive_message_len(100 << 20)
.secure_connect(SPANNER_ADDRESS, creds))
})
create_spanner_session(
Arc::clone(&self.env),
self.metrics.clone(),
&self.database_name,
self.test_transactions,
)
.await
.map_err(|e| match e {
actix_web::error::BlockingError::Error(e) => e.into(),
actix_web::error::BlockingError::Canceled => {
DbError::internal("web::block Manager operation canceled")
}
})?;
let client = SpannerClient::new(chan);

// Connect to the instance and create a Spanner session.
let session = create_session(&client, &self.database_name).await?;

Ok(SpannerSession {
client,
session,
use_test_transactions: self.test_transactions,
})
}

async fn is_valid(&self, mut conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
let mut req = GetSessionRequest::new();
req.set_name(conn.session.get_name().to_owned());
if let Err(e) = conn.client.get_session_async(&req)?.await {
match e {
grpcio::Error::RpcFailure(ref status)
if status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
conn.session = create_session(&conn.client, &self.database_name).await?;
}
_ => return Err(e.into()),
}
}
recycle_spanner_session(&mut conn, &self.database_name).await?;
Ok(conn)
}

Expand All @@ -131,15 +86,11 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
}
}

pub async fn create_session(
client: &SpannerClient,
database_name: &str,
) -> Result<Session, grpcio::Error> {
let mut req = CreateSessionRequest::new();
req.database = database_name.to_owned();
let mut meta = MetadataBuilder::new();
meta.add_str("google-cloud-resource-prefix", database_name)?;
meta.add_str("x-goog-api-client", "gcp-grpc-rs")?;
let opt = CallOption::default().headers(meta.build());
client.create_session_async_opt(&req, opt)?.await
impl From<bb8::State> for PoolState {
fn from(state: bb8::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}

0 comments on commit 746f5d1

Please sign in to comment.