Skip to content

Commit

Permalink
fix: [rust] on_close event
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai132 committed Dec 28, 2023
1 parent d7d96c5 commit 9128068
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 13 deletions.
20 changes: 8 additions & 12 deletions rust/src/net/src/detail/tcp_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ impl TcpChannel {
}

pub fn close(&self) {
*self.is_open.borrow_mut() = false;
self.send_queue.borrow_mut().clear();
self.send_queue_notify.notify_one();
if let Some(on_close) = self.on_close.take() {
on_close();
}
self.do_close();
}

pub async fn wait_close_finish(&self) {
Expand Down Expand Up @@ -171,24 +166,25 @@ impl TcpChannel {
}
}
trace!("loop exit: write");

if let Some(on_close) = this.on_close.borrow().as_ref() {
on_close();
}
this.write_quit_finish_notify.notify_one();
});
});
}

pub fn do_close(&self) {
fn do_close(&self) {
if !*self.is_open.borrow() {
return;
}
*self.is_open.borrow_mut() = false;
self.quit_notify.notify_waiters();
self.send_queue_notify.notify_one();
if let Some(on_close) = self.on_close.borrow().as_ref() {
on_close();
}
}

pub async fn do_read_data(&self, read_half: &mut ReadHalf<TcpStream>) -> bool {
async fn do_read_data(&self, read_half: &mut ReadHalf<TcpStream>) -> bool {
let mut buffer = vec![];
let read_result = read_half.read_buf(&mut buffer).await.ok();

Expand All @@ -203,7 +199,7 @@ impl TcpChannel {
};
}

pub async fn do_read_header(&self, read_half: &mut ReadHalf<TcpStream>) -> bool {
async fn do_read_header(&self, read_half: &mut ReadHalf<TcpStream>) -> bool {
let mut buffer = [0u8; 4];
let read_result = read_half.read_exact(&mut buffer).await.ok();

Expand Down
2 changes: 1 addition & 1 deletion rust/src/net/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl RpcServer {
let rpc = if let Some(rpc) = this.config.borrow().rpc.clone() {
if rpc.is_ready() {
debug!("rpc already connected");
tcp_channel.do_close();
tcp_channel.close();
return;
}
rpc
Expand Down

0 comments on commit 9128068

Please sign in to comment.