Skip to content

Commit

Permalink
Fix connection reset on disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Jul 31, 2020
1 parent 2e71f8c commit 14e6f43
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
19 changes: 10 additions & 9 deletions server/src/dbnet.rs
Expand Up @@ -127,31 +127,32 @@ impl Listener {
_term_sig_tx: self.terminate_tx.clone(),
};
tokio::spawn(async move {
chandle.run().await;
if let Err(e) = chandle.run().await {
eprintln!("Error: {}", e);
}
});
}
}
}

impl CHandler {
/// Process the incoming connection
async fn run(&mut self) {
async fn run(&mut self) -> TResult<()> {
while !self.terminator.is_termination_signal() {
let try_df = tokio::select! {
tdf = self.con.read_query() => tdf,
_ = self.terminator.receive_signal() => {
return;
return Ok(());
}
};
match try_df {
Ok(Q(s)) => self.con.write_response(self.db.execute_query(s)).await,
Ok(E(r)) => self.con.close_conn_with_error(r).await,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
Ok(Q(s)) => self.con.write_response(self.db.execute_query(s)).await?,
Ok(E(r)) => self.con.close_conn_with_error(r).await?,
Ok(Empty) => return Ok(()),
Err(e) => return Err(e.into()),
}
}
Ok(())
}
}

Expand Down
32 changes: 17 additions & 15 deletions server/src/protocol/mod.rs
Expand Up @@ -22,6 +22,7 @@
mod deserializer;
use bytes::{Buf, BytesMut};
use corelib::terrapipe::RespBytes;
use corelib::TResult;
use deserializer::Navigator;
pub use deserializer::{
Query,
Expand Down Expand Up @@ -50,6 +51,8 @@ pub enum QueryResult {
Q(Query),
/// An error response
E(Vec<u8>),
/// A closed connection
Empty,
}

impl Connection {
Expand All @@ -68,20 +71,24 @@ impl Connection {
self.read_again().await?;
loop {
match self.try_query() {
Parsed((query, forward)) => {
Ok(Parsed((query, forward))) => {
self.buffer.advance(forward);
return Ok(QueryResult::Q(query));
}
RespCode(r) => return Ok(QueryResult::E(r.into_response())),
Ok(RespCode(r)) => return Ok(QueryResult::E(r.into_response())),
Err(_) => return Ok(QueryResult::Empty),
_ => (),
}
self.read_again().await?;
}
}
/// Try to parse a query from the buffered data
fn try_query(&mut self) -> QueryParseResult {
fn try_query(&mut self) -> Result<QueryParseResult, ()> {
if self.buffer.is_empty() {
return Err(());
}
let nav = Navigator::new(&mut self.buffer);
Query::from_navigator(nav)
Ok(Query::from_navigator(nav))
}
/// Try to fill the buffer again
async fn read_again(&mut self) -> Result<(), String> {
Expand All @@ -90,7 +97,7 @@ impl Connection {
// If 0 bytes were received, then the remote end closed
// the connection
if self.buffer.is_empty() {
return Err(format!("{:?} didn't send any data", self.get_peer()).into());
return Ok(());
} else {
return Err(format!(
"Connection reset while reading from: {:?}",
Expand All @@ -108,20 +115,15 @@ impl Connection {
self.stream.get_ref().peer_addr()
}
/// Write a response to the stream
pub async fn write_response(&mut self, resp: Vec<u8>) {
if let Err(_) = self.stream.write_all(&resp).await {
eprintln!("Error while writing to stream: {:?}", self.get_peer());
return;
}
pub async fn write_response(&mut self, resp: Vec<u8>) -> TResult<()> {
self.stream.write_all(&resp).await?;
// Flush the stream to make sure that the data was delivered
if let Err(_) = self.stream.flush().await {
eprintln!("Error while flushing data to stream: {:?}", self.get_peer());
return;
}
self.stream.flush().await?;
Ok(())
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
pub async fn close_conn_with_error(&mut self, bytes: Vec<u8>) {
pub async fn close_conn_with_error(&mut self, bytes: Vec<u8>) -> TResult<()> {
self.write_response(bytes).await
}
}

0 comments on commit 14e6f43

Please sign in to comment.