Skip to content

Commit

Permalink
Fix hang when the connection is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
lcdr committed Apr 14, 2021
1 parent e04fc74 commit fbc4d13
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 33 deletions.
25 changes: 20 additions & 5 deletions src/raknet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ffi::CStr;
use std::io::ErrorKind;
use std::io::Result as Res;
use std::os::raw::{c_char, c_uchar};
use std::ptr;
Expand All @@ -18,6 +19,9 @@ const RAK_SHUTDOWN: usize = 0x641dc0;
const RAK_STARTUP: usize = 0x645e40;
const CLOSE_CONNECT_PARAM: usize = 0x7332b8;

const ID_DISCONNECTION_NOTIFICATION: u8 = 19;
const ID_CONNECTION_LOST: u8 = 20;

const STOP_PROCESSING_AND_DEALLOCATE: u32 = 0;
const STOP_PROCESSING: u32 = 2;

Expand Down Expand Up @@ -58,10 +62,7 @@ unsafe extern "thiscall" fn new_close_connection(this: usize, ip: u32, port: u16
extern "thiscall" fn new_connect(this: usize, host: *const c_char, port: u16, password: *const c_char, password_len: u32, socket_index: u32) -> bool {
match connect(this, host, port, password, password_len, socket_index) {
Ok(()) => true,
Err(e) => {
dbg!(e);
false
}
Err(e) => { dbg!(e); false }
}
}

Expand Down Expand Up @@ -135,7 +136,21 @@ unsafe extern "thiscall" fn new_receive(this: usize) -> *const RakPacket {
}
packet
},
_ => ptr::null(),
Err(e) => {
dbg!(&e);
let packet = conn.new_rak_packet(Box::new([
if e.kind() == ErrorKind::BrokenPipe {
ID_DISCONNECTION_NOTIFICATION
} else {
ID_CONNECTION_LOST
}
]));

let b = Box::from_raw(conn);
drop(b);
set_conn(this, MTU);
packet
},
}
}

Expand Down
49 changes: 21 additions & 28 deletions src/tcpudp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const ID_CONNECTED_PONG: u8 = 3;
const ID_CONNECTION_REQUEST_ACCEPTED: u8 = 14;
const ID_NEW_INCOMING_CONNECTION: u8 = 17;
const ID_DISCONNECTION_NOTIFICATION: u8 = 19;
const ID_CONNECTION_LOST: u8 = 20;
const PING_INTERVAL: u32 = 5000;
const MTU_SIZE: usize = 1228; // set by LU
const UDP_HEADER_SIZE: usize = 28;
Expand Down Expand Up @@ -69,17 +68,11 @@ struct BufferOffset {
buffer: Box<[u8]>,
}

#[derive(PartialEq)]
enum ConnState {
Open,
Disconnected,
Lost,
}

pub struct Connection {
tcp: Box<dyn ReliableTransport>,
udp: UdpSocket,
state: ConnState,
tcp_peer_addr: SocketAddr,
last_error: Option<io::Error>,
ping_timer: u32,
last_reliable_send_time: u32,
last_ping: u32,
Expand All @@ -101,10 +94,12 @@ impl Connection {
udp.connect((host, port))?;
tcp.set_nonblocking(true)?;
udp.set_nonblocking(true)?;
let tcp_peer_addr = tcp.peer_addr()?;
Ok(Connection {
tcp,
udp,
state: ConnState::Open,
tcp_peer_addr,
last_error: None,
ping_timer: 0,
last_reliable_send_time: 0,
last_ping: 0,
Expand All @@ -117,15 +112,18 @@ impl Connection {
}

pub fn close(&mut self) {
self.state = ConnState::Disconnected;
if let Err(e) = self.send(&[ID_DISCONNECTION_NOTIFICATION], REL_ORD) {
dbg!(e);
}
self.last_error = Some(io::Error::new(io::ErrorKind::BrokenPipe, "connection closed by ourselves"));
}

/// Send a packet.
pub fn send(&mut self, data: &[u8], reliability: u32) -> Res<()> {
match self.send_internal(data, reliability) {
Ok(()) => Ok(()),
Err(e) => {
self.state = ConnState::Lost;
self.last_error = Some(e.kind().into());
Err(e)
}
}
Expand Down Expand Up @@ -180,19 +178,15 @@ impl Connection {
}

fn receive_internal(&mut self) -> Res<*const RakPacket> {
if let Some(e) = &self.last_error {
return Err(e.kind().into());
}

if self.ping_timer - self.last_reliable_send_time > PING_INTERVAL {
let _ = self.send_ping();
}
self.ping_timer = self.ping_timer.wrapping_add(20);

if self.state == ConnState::Disconnected {
self.state = ConnState::Open;
return Ok(self.new_rak_packet(Box::new([ID_DISCONNECTION_NOTIFICATION])));
} else if self.state == ConnState::Lost {
self.state = ConnState::Open;
return Ok(self.new_rak_packet(Box::new([ID_CONNECTION_LOST])));
}

match self.receive_tcp() {
Ok(packet) => { return Ok(packet); },
Err(err) => {
Expand All @@ -212,8 +206,8 @@ impl Connection {
Ok(std::ptr::null())
}

fn new_rak_packet(&self, data: Box<[u8]>) -> *const RakPacket {
let peer_addr = self.tcp.peer_addr().unwrap();
pub fn new_rak_packet(&self, data: Box<[u8]>) -> *const RakPacket {
let peer_addr = self.tcp_peer_addr;
let ip = match peer_addr.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(ip) => {
Expand Down Expand Up @@ -244,12 +238,12 @@ impl Connection {
let reader = unsafe { &mut &BUF[..] };
let rel: u8 = reader.read()?;
if rel == 0 {
Ok(unsafe { self.new_rak_packet(Box::from(&BUF[1..len])) } )
Ok(self.new_rak_packet(Box::from(unsafe { &BUF[1..len] })))
} else if rel == 1 {
let seq_num: u32 = reader.read()?;
if seq_num.wrapping_sub(self.seq_num_recv) < u32::max_value() / 2 {
self.seq_num_recv = seq_num.wrapping_add(1);
Ok(unsafe { self.new_rak_packet(Box::from(&BUF[5..len])) } )
Ok(self.new_rak_packet(Box::from(unsafe { &BUF[5..len] })))
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, "older sequenced packet"))
}
Expand Down Expand Up @@ -316,8 +310,7 @@ impl Connection {
}

fn on_conn_req_acc(&mut self, _cra: &[u8]) -> Res<()> {
let peer_addr = self.tcp.peer_addr().unwrap();
let peer_ip = match peer_addr.ip() {
let peer_ip = match self.tcp_peer_addr.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(ip) => {
if ip.is_loopback() {
Expand All @@ -327,7 +320,7 @@ impl Connection {
}
}
}.octets();
let local_addr = self.tcp.local_addr().unwrap();
let local_addr = self.tcp.local_addr()?;
let local_ip = match local_addr.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(ip) => {
Expand All @@ -343,7 +336,7 @@ impl Connection {
let mut writer = &mut packet[..];
writer.write(ID_NEW_INCOMING_CONNECTION)?;
writer.write(&peer_ip[..])?;
writer.write(peer_addr.port())?;
writer.write(self.tcp_peer_addr.port())?;
writer.write(&local_ip[..])?;
writer.write(local_addr.port())?;
self.send(&packet, REL_ORD)
Expand Down

0 comments on commit fbc4d13

Please sign in to comment.