Skip to content

Commit

Permalink
Stop using 'smart' connection pool
Browse files Browse the repository at this point in the history
First of all, the pool implementation is broken:
tower-rs/tower#456

Second, I'm tired of having benchmarks give highly volatile (and
sometimes just straight-up bad) results just because of connections
being dynamically managed.
  • Loading branch information
jonhoo committed May 27, 2020
1 parent 48bbcdd commit 10fd7b2
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 43 deletions.
8 changes: 4 additions & 4 deletions noria/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ extern crate slog;
/// sure this value is high enough.
pub(crate) const BUFFER_TO_POOL: usize = 2048;

/// The maximum number of concurrent connections to a given backend table.
/// The number of concurrent connections to a given backend table.
///
/// Since Noria connections are multiplexing, having this value > 1 _only_ allows us to do
/// serialization/deserialization in parallel on multiple threads. Nothing else really.
Expand All @@ -162,9 +162,9 @@ pub(crate) const BUFFER_TO_POOL: usize = 2048;
/// - Table operations are generally not bottlenecked on serialization, but on committing.
///
/// The value isn't lower, because we want _some_ concurrency in serialization.
pub(crate) const MAX_TABLE_POOL_SIZE: usize = 2;
pub(crate) const TABLE_POOL_SIZE: usize = 2;

/// The maximum number of concurrent connections to a given backend view.
/// The number of concurrent connections to a given backend view.
///
/// Since Noria connections are multiplexing, having this value > 1 _only_ allows us to do
/// serialization/deserialization in parallel on multiple threads. Nothing else really.
Expand All @@ -178,7 +178,7 @@ pub(crate) const MAX_TABLE_POOL_SIZE: usize = 2;
/// reasonable.
///
/// The value isn't higher because we, _and the server_ only have so many cores.
pub(crate) const MAX_VIEW_POOL_SIZE: usize = 16;
pub(crate) const VIEW_POOL_SIZE: usize = 16;

/// Number of requests that can be pending on any _single_ connection.
///
Expand Down
59 changes: 40 additions & 19 deletions noria/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::AsyncWriteExt;
use tokio_tower::multiplex;
use tower_balance::pool::{self, Pool};
use tower_balance::p2c::Balance;
use tower_buffer::Buffer;
use tower_discover::ServiceStream;
use tower_limit::concurrency::ConcurrencyLimit;
use tower_service::Service;
use vec_map::VecMap;
Expand Down Expand Up @@ -201,21 +202,21 @@ async fn update_user(users: &mut Table) -> Result<(), TableError> {
}

#[derive(Debug)]
#[doc(hidden)]
// only pub because we use it to figure out the error type for TableError
pub struct TableEndpoint(SocketAddr);
struct Endpoint(SocketAddr);

impl Service<()> for TableEndpoint {
type Response = ConcurrencyLimit<
multiplex::Client<
type InnerService = ConcurrencyLimit<
multiplex::Client<
multiplex::MultiplexTransport<Transport, Tagger>,
tokio_tower::Error<
multiplex::MultiplexTransport<Transport, Tagger>,
tokio_tower::Error<
multiplex::MultiplexTransport<Transport, Tagger>,
Tagged<LocalOrNot<Input>>,
>,
Tagged<LocalOrNot<Input>>,
>,
>;
Tagged<LocalOrNot<Input>>,
>,
>;

impl Service<()> for Endpoint {
type Response = InnerService;
type Error = tokio::io::Error;
type Future = impl Future<Output = Result<Self::Response, Self::Error>>;

Expand All @@ -241,8 +242,33 @@ impl Service<()> for TableEndpoint {
}
}

fn make_table_stream(
addr: SocketAddr,
) -> impl futures_util::stream::TryStream<
Ok = tower_discover::Change<usize, InnerService>,
Error = tokio::io::Error,
> {
// TODO: use whatever comes out of https://github.com/tower-rs/tower/issues/456 instead of
// creating _all_ the connections every time.
(0..crate::TABLE_POOL_SIZE)
.map(|i| async move {
let svc = Endpoint(addr).call(()).await?;
Ok(tower_discover::Change::Insert(i, svc))
})
.collect::<futures_util::stream::FuturesUnordered<_>>()
}

fn make_table_discover(addr: SocketAddr) -> Discover {
ServiceStream::new(make_table_stream(addr))
}

// Unpin + Send bounds are needed due to https://github.com/rust-lang/rust/issues/55997
type Discover = impl tower_discover::Discover<Key = usize, Service = InnerService, Error = tokio::io::Error>
+ Unpin
+ Send;

pub(crate) type TableRpc =
Buffer<Pool<TableEndpoint, (), Tagged<LocalOrNot<Input>>>, Tagged<LocalOrNot<Input>>>;
Buffer<Balance<Discover, Tagged<LocalOrNot<Input>>>, Tagged<LocalOrNot<Input>>>;

/// A failed [`SyncTable`] operation.
#[derive(Debug, Fail)]
Expand Down Expand Up @@ -323,12 +349,7 @@ impl TableBuilder {
Entry::Vacant(h) => {
// TODO: maybe always use the same local port?
let (c, w) = Buffer::pair(
pool::Builder::new()
.urgency(0.01)
.loaded_above(0.2)
.underutilized_below(0.000_000_001)
.max_services(Some(crate::MAX_TABLE_POOL_SIZE))
.build(TableEndpoint(addr), ()),
Balance::from_entropy(make_table_discover(addr)),
crate::BUFFER_TO_POOL,
);
use tracing_futures::Instrument;
Expand Down
61 changes: 41 additions & 20 deletions noria/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio_tower::multiplex;
use tower_balance::pool::{self, Pool};
use tower_balance::p2c::Balance;
use tower_buffer::Buffer;
use tower_discover::ServiceStream;
use tower_limit::concurrency::ConcurrencyLimit;
use tower_service::Service;

Expand All @@ -28,18 +29,18 @@ type Transport = AsyncBincodeStream<
>;

#[derive(Debug)]
#[doc(hidden)]
// only pub because we use it to figure out the error type for ViewError
pub struct ViewEndpoint(SocketAddr);

impl Service<()> for ViewEndpoint {
type Response = ConcurrencyLimit<
multiplex::Client<
multiplex::MultiplexTransport<Transport, Tagger>,
tokio_tower::Error<multiplex::MultiplexTransport<Transport, Tagger>, Tagged<ReadQuery>>,
Tagged<ReadQuery>,
>,
>;
struct Endpoint(SocketAddr);

type InnerService = ConcurrencyLimit<
multiplex::Client<
multiplex::MultiplexTransport<Transport, Tagger>,
tokio_tower::Error<multiplex::MultiplexTransport<Transport, Tagger>, Tagged<ReadQuery>>,
Tagged<ReadQuery>,
>,
>;

impl Service<()> for Endpoint {
type Response = InnerService;
type Error = tokio::io::Error;
type Future = impl Future<Output = Result<Self::Response, Self::Error>>;

Expand All @@ -62,7 +63,32 @@ impl Service<()> for ViewEndpoint {
}
}

pub(crate) type ViewRpc = Buffer<Pool<ViewEndpoint, (), Tagged<ReadQuery>>, Tagged<ReadQuery>>;
fn make_views_stream(
addr: SocketAddr,
) -> impl futures_util::stream::TryStream<
Ok = tower_discover::Change<usize, InnerService>,
Error = tokio::io::Error,
> {
// TODO: use whatever comes out of https://github.com/tower-rs/tower/issues/456 instead of
// creating _all_ the connections every time.
(0..crate::VIEW_POOL_SIZE)
.map(|i| async move {
let svc = Endpoint(addr).call(()).await?;
Ok(tower_discover::Change::Insert(i, svc))
})
.collect::<futures_util::stream::FuturesUnordered<_>>()
}

fn make_views_discover(addr: SocketAddr) -> Discover {
ServiceStream::new(make_views_stream(addr))
}

// Unpin + Send bounds are needed due to https://github.com/rust-lang/rust/issues/55997
type Discover = impl tower_discover::Discover<Key = usize, Service = InnerService, Error = tokio::io::Error>
+ Unpin
+ Send;

pub(crate) type ViewRpc = Buffer<Balance<Discover, Tagged<ReadQuery>>, Tagged<ReadQuery>>;

/// A failed [`SyncView`] operation.
#[derive(Debug, Fail)]
Expand Down Expand Up @@ -146,12 +172,7 @@ impl ViewBuilder {
Entry::Vacant(h) => {
// TODO: maybe always use the same local port?
let (c, w) = Buffer::pair(
pool::Builder::new()
.urgency(0.03)
.loaded_above(0.2)
.underutilized_below(0.000_000_001)
.max_services(Some(crate::MAX_VIEW_POOL_SIZE))
.build(ViewEndpoint(addr), ()),
Balance::from_entropy(make_views_discover(addr)),
crate::BUFFER_TO_POOL,
);
use tracing_futures::Instrument;
Expand Down

0 comments on commit 10fd7b2

Please sign in to comment.