Skip to content

Commit

Permalink
Merge pull request #437 from zed-industries/fix-server-hangs
Browse files Browse the repository at this point in the history
Use an unbounded channel for peer's outgoing messages
  • Loading branch information
maxbrunsfeld committed Feb 8, 2022
2 parents 82afacd + 8a2613d commit 30e8709
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 497 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 16 additions & 22 deletions crates/client/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
};
use sum_tree::{Bias, SumTree};
use time::OffsetDateTime;
use util::{post_inc, TryFutureExt};
use util::{post_inc, ResultExt as _, TryFutureExt};

pub struct ChannelList {
available_channels: Option<Vec<ChannelDetails>>,
Expand Down Expand Up @@ -168,16 +168,12 @@ impl ChannelList {
impl Entity for Channel {
type Event = ChannelEvent;

fn release(&mut self, cx: &mut MutableAppContext) {
let rpc = self.rpc.clone();
let channel_id = self.details.id;
cx.foreground()
.spawn(async move {
if let Err(error) = rpc.send(proto::LeaveChannel { channel_id }).await {
log::error!("error leaving channel: {}", error);
};
fn release(&mut self, _: &mut MutableAppContext) {
self.rpc
.send(proto::LeaveChannel {
channel_id: self.details.id,
})
.detach()
.log_err();
}
}

Expand Down Expand Up @@ -718,18 +714,16 @@ mod tests {
});

// Receive a new message.
server
.send(proto::ChannelMessageSent {
channel_id: channel.read_with(&cx, |channel, _| channel.details.id),
message: Some(proto::ChannelMessage {
id: 12,
body: "c".into(),
timestamp: 1002,
sender_id: 7,
nonce: Some(3.into()),
}),
})
.await;
server.send(proto::ChannelMessageSent {
channel_id: channel.read_with(&cx, |channel, _| channel.details.id),
message: Some(proto::ChannelMessage {
id: 12,
body: "c".into(),
timestamp: 1002,
sender_id: 7,
nonce: Some(3.into()),
}),
});

// Client requests user for message since they haven't seen them yet
let get_users = server.receive::<proto::GetUsers>().await.unwrap();
Expand Down
62 changes: 47 additions & 15 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use std::{
collections::HashMap,
convert::TryFrom,
fmt::Write as _,
future::Future,
sync::{Arc, Weak},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
},
time::{Duration, Instant},
};
use surf::{http::Method, Url};
Expand Down Expand Up @@ -55,6 +57,7 @@ pub fn init(rpc: Arc<Client>, cx: &mut MutableAppContext) {
}

pub struct Client {
id: usize,
peer: Arc<Peer>,
http: Arc<dyn HttpClient>,
state: RwLock<ClientState>,
Expand Down Expand Up @@ -167,7 +170,12 @@ impl Drop for Subscription {

impl Client {
pub fn new(http: Arc<dyn HttpClient>) -> Arc<Self> {
lazy_static! {
static ref NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::default();
}

Arc::new(Self {
id: NEXT_CLIENT_ID.fetch_add(1, Ordering::SeqCst),
peer: Peer::new(),
http,
state: Default::default(),
Expand Down Expand Up @@ -448,21 +456,31 @@ impl Client {
None
};

let type_name = message.payload_type_name();

let handler_key = (payload_type_id, entity_id);
if let Some(handler) = state.model_handlers.get_mut(&handler_key) {
let mut handler = handler.take().unwrap();
drop(state); // Avoid deadlocks if the handler interacts with rpc::Client
let start_time = Instant::now();
log::info!("RPC client message {}", message.payload_type_name());

log::debug!(
"rpc message received. client_id:{}, name:{}",
this.id,
type_name
);
(handler)(message, &mut cx);
log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
log::debug!(
"rpc message handled. client_id:{}, name:{}",
this.id,
type_name
);

let mut state = this.state.write();
if state.model_handlers.contains_key(&handler_key) {
state.model_handlers.insert(handler_key, Some(handler));
}
} else {
log::info!("unhandled message {}", message.payload_type_name());
log::info!("unhandled message {}", type_name);
}
}
}
Expand Down Expand Up @@ -677,27 +695,41 @@ impl Client {
}
}

pub async fn send<T: EnvelopedMessage>(&self, message: T) -> Result<()> {
self.peer.send(self.connection_id()?, message).await
pub fn send<T: EnvelopedMessage>(&self, message: T) -> Result<()> {
log::debug!("rpc send. client_id:{}, name:{}", self.id, T::NAME);
self.peer.send(self.connection_id()?, message)
}

pub async fn request<T: RequestMessage>(&self, request: T) -> Result<T::Response> {
self.peer.request(self.connection_id()?, request).await
log::debug!(
"rpc request start. client_id: {}. name:{}",
self.id,
T::NAME
);
let response = self.peer.request(self.connection_id()?, request).await;
log::debug!(
"rpc request finish. client_id: {}. name:{}",
self.id,
T::NAME
);
response
}

pub fn respond<T: RequestMessage>(
&self,
receipt: Receipt<T>,
response: T::Response,
) -> impl Future<Output = Result<()>> {
) -> Result<()> {
log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
self.peer.respond(receipt, response)
}

pub fn respond_with_error<T: RequestMessage>(
&self,
receipt: Receipt<T>,
error: proto::Error,
) -> impl Future<Output = Result<()>> {
) -> Result<()> {
log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
self.peer.respond_with_error(receipt, error)
}
}
Expand Down Expand Up @@ -860,8 +892,8 @@ mod tests {
});
drop(subscription3);

server.send(proto::UnshareProject { project_id: 1 }).await;
server.send(proto::UnshareProject { project_id: 2 }).await;
server.send(proto::UnshareProject { project_id: 1 });
server.send(proto::UnshareProject { project_id: 2 });
done_rx1.next().await.unwrap();
done_rx2.next().await.unwrap();
}
Expand Down Expand Up @@ -890,7 +922,7 @@ mod tests {
Ok(())
})
});
server.send(proto::Ping {}).await;
server.send(proto::Ping {});
done_rx2.next().await.unwrap();
}

Expand All @@ -914,7 +946,7 @@ mod tests {
},
));
});
server.send(proto::Ping {}).await;
server.send(proto::Ping {});
done_rx.next().await.unwrap();
}

Expand Down
6 changes: 3 additions & 3 deletions crates/client/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ impl FakeServer {
self.forbid_connections.store(false, SeqCst);
}

pub async fn send<T: proto::EnvelopedMessage>(&self, message: T) {
self.peer.send(self.connection_id(), message).await.unwrap();
pub fn send<T: proto::EnvelopedMessage>(&self, message: T) {
self.peer.send(self.connection_id(), message).unwrap();
}

pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
Expand Down Expand Up @@ -148,7 +148,7 @@ impl FakeServer {
receipt: Receipt<T>,
response: T::Response,
) {
self.peer.respond(receipt, response).await.unwrap()
self.peer.respond(receipt, response).unwrap()
}

fn connection_id(&self) -> ConnectionId {
Expand Down
Loading

0 comments on commit 30e8709

Please sign in to comment.