Skip to content

Commit

Permalink
Revert "server: Remove conns for replicas that die"
Browse files Browse the repository at this point in the history
This reverts commit eb69ec6f679c588e1a19648ab8fa12d91e5a41ec.

As far as I can tell, this introduced either a performance regression or
an actual major issue that is causing the replicators tests to either
get stuck or take considerably longer to finish in CI (but pass
locally?). More investigation is required, but for now since our HA test
coverage is crummy this can be reverted without actually breaking
tests (though it does break workers dying and re-joining in HA
deployments)

Change-Id: I99c43ba13afa48397c4e8d685249fbd1fd4c6b7c
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5668
Tested-by: Buildkite CI
Reviewed-by: Nick Marino <nick@readyset.io>
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
glittershark authored and lukoktonos committed Aug 7, 2023
1 parent 176c43a commit 7a0aa4c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 68 deletions.
40 changes: 7 additions & 33 deletions readyset-client/src/channel/mod.rs
Expand Up @@ -14,7 +14,6 @@ use std::task::{Context, Poll};
use async_bincode::{AsyncBincodeWriter, AsyncDestination};
use futures_util::sink::{Sink, SinkExt};
use tokio::io::BufWriter;
use tokio::sync::broadcast;

pub mod tcp;

Expand All @@ -25,13 +24,6 @@ use crate::{ReadySetError, ReadySetResult};
pub const CONNECTION_FROM_BASE: u8 = 1;
pub const CONNECTION_FROM_DOMAIN: u8 = 2;

/// Buffer size to use for the broadcast channel to notify replicas about changes to the addresses
/// of other replicas
///
/// If more than this number of changes to replica addresses are enqueued without all replicas
/// reading them, the replicas that lag behind will reconnect to all other replicas
const COORDINATOR_CHANGE_CHANNEL_BUFFER_SIZE: usize = 64;

pub struct Remote;
pub struct MaybeLocal;

Expand Down Expand Up @@ -200,8 +192,6 @@ struct ChannelCoordinatorInner<K: Eq + Hash + Clone, T> {

pub struct ChannelCoordinator<K: Eq + Hash + Clone, T> {
inner: RwLock<ChannelCoordinatorInner<K, T>>,
/// Broadcast channel that can be used to be notified when the address for a key changes
changes_tx: broadcast::Sender<K>,
}

impl<K: Eq + Hash + Clone, T> Default for ChannelCoordinator<K, T> {
Expand All @@ -217,48 +207,32 @@ impl<K: Eq + Hash + Clone, T> ChannelCoordinator<K, T> {
addrs: Default::default(),
locals: Default::default(),
}),
changes_tx: broadcast::channel(COORDINATOR_CHANGE_CHANNEL_BUFFER_SIZE).0,
}
}

/// Create a new [`broadcast::Receiver`] which will be notified whenver the local or remote
/// address for a key is changed (added or removed)
pub fn subscribe(&self) -> broadcast::Receiver<K> {
self.changes_tx.subscribe()
}

pub fn insert_remote(&self, key: K, addr: SocketAddr) {
#[allow(clippy::expect_used)]
// This can only fail if the mutex is poisoned, in which case we can't recover,
// so we allow to panic if that happens.
{
let mut guard = self.inner.write().expect("poisoned mutex");
guard.addrs.insert(key.clone(), addr);
}
let _ = self.changes_tx.send(key);
let mut guard = self.inner.write().expect("poisoned mutex");
guard.addrs.insert(key, addr);
}

pub fn remove(&self, key: K) {
#[allow(clippy::expect_used)]
// This can only fail if the mutex is poisoned, in which case we can't recover,
// so we allow to panic if that happens.
{
let mut guard = self.inner.write().expect("poisoned mutex");
guard.addrs.remove(&key);
guard.locals.remove(&key);
}
let _ = self.changes_tx.send(key);
let mut guard = self.inner.write().expect("poisoned mutex");
guard.addrs.remove(&key);
guard.locals.remove(&key);
}

pub fn insert_local(&self, key: K, chan: tokio::sync::mpsc::UnboundedSender<T>) {
#[allow(clippy::expect_used)]
// This can only fail if the mutex is poisoned, in which case we can't recover,
// so we allow to panic if that happens.
{
let mut guard = self.inner.write().expect("poisoned mutex");
guard.locals.insert(key.clone(), chan);
}
let _ = self.changes_tx.send(key);
let mut guard = self.inner.write().expect("poisoned mutex");
guard.locals.insert(key, chan);
}

pub fn has<Q>(&self, key: &Q) -> bool
Expand Down
38 changes: 3 additions & 35 deletions readyset-server/src/worker/replica.rs
Expand Up @@ -21,9 +21,9 @@ use strawpoll::Strawpoll;
use time::Duration;
use tokio::io::{AsyncReadExt, BufReader, BufStream, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::wrappers::IntervalStream;
use tracing::{debug, error, info, info_span, instrument, trace, warn, Span};
use tracing::{debug, error, info_span, instrument, trace, warn, Span};

use super::ChannelCoordinator;

Expand Down Expand Up @@ -257,10 +257,7 @@ impl Replica {
}

let tx = match connections.entry(replica_address) {
Occupied(entry) => {
trace!(%replica_address, "Reusing existing domain connection");
entry.into_mut()
}
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let Some(addr) = coord.get_addr(&replica_address) else {
trace!(
Expand All @@ -280,7 +277,6 @@ impl Replica {
continue;
}

debug!(%replica_address, %addr, "Establishing connection to domain");
entry.insert(coord.builder_for(&replica_address)?.build_async()?)
}
};
Expand Down Expand Up @@ -353,8 +349,6 @@ impl Replica {
init_state_reqs,
} = &mut self;

let mut channel_changes = coord.subscribe();

loop {
// we have three logical input sources: receives from local domains, receives from
// remote domains, and remote mutators.
Expand All @@ -379,32 +373,6 @@ impl Replica {
connections.insert(token, tcp);
},

// Handle changes to the addresses of individual domain replicas
replica_addr = channel_changes.recv() => {
match replica_addr {
Ok(replica_addr) => {
// We've received a notification that the socket address for a replica
// has changed - remove its cached connection from `outputs` so that
// when we try to send to it later we re-lookup the addr and reconnect
info!(%replica_addr, "Removing connection for replica");
outputs.lock().await.remove(&replica_addr);
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
// If we've lagged behind, that means we've missed some changes to
// replica addresses, so we need to consider all connections invalid
warn!(
%skipped,
"Coordinator change broadcast receiver lagged behind; reconnecting \
to all replicas"
);
outputs.lock().await.clear();
}
Err(broadcast::error::RecvError::Closed) => {
panic!("ChannelCoordinator dropped!");
}
}
}

// Handle domain requests
domain_req = requests.recv() => match domain_req {
Some(req) => {
Expand Down

0 comments on commit 7a0aa4c

Please sign in to comment.