Skip to content

Commit b9dce87

Browse files
mmirateEugeny
authored andcommitted
Improve keepalive and inactivity timers
* Add an analogue of OpenSSH's `ServerAliveCountMax`. * Use disjunctive futures for cleanly making these timers optional. * Use the `Session` to pass information back to the main bg loop from the plaintext packet reader, so that only nontrivial data transfer will reset the inactivity timer. (And so that `ServerAliveCountMax` will be judged correctly.)
1 parent da4c040 commit b9dce87

File tree

3 files changed

+71
-24
lines changed

3 files changed

+71
-24
lines changed

russh/src/client/encrypted.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ impl Session {
540540
);
541541
match req {
542542
b"xon-xoff" => {
543+
self.activity = false;
543544
r.read_byte().map_err(crate::Error::from)?; // should be 0.
544545
let client_can_do = r.read_byte().map_err(crate::Error::from)? != 0;
545546
if let Some(chan) = self.channels.get(&channel_num) {
@@ -586,6 +587,7 @@ impl Session {
586587
.await
587588
}
588589
b"keepalive@openssh.com" => {
590+
self.activity = false;
589591
let wants_reply = r.read_byte().map_err(crate::Error::from)?;
590592
if wants_reply == 1 {
591593
if let Some(ref mut enc) = self.common.encrypted {
@@ -605,6 +607,7 @@ impl Session {
605607
Ok((client, self))
606608
}
607609
_ => {
610+
self.activity = false;
608611
let wants_reply = r.read_byte().map_err(crate::Error::from)?;
609612
if wants_reply == 1 {
610613
if let Some(ref mut enc) = self.common.encrypted {
@@ -704,6 +707,7 @@ impl Session {
704707
push_packet!(enc.write, enc.write.push(msg::REQUEST_FAILURE))
705708
}
706709
}
710+
self.activity = false;
707711
Ok((client, self))
708712
}
709713
Some(&msg::CHANNEL_SUCCESS) => {
@@ -816,7 +820,16 @@ impl Session {
816820
Err(crate::Error::Inconsistent.into())
817821
}
818822
}
823+
Some(&msg::REQUEST_SUCCESS | &msg::REQUEST_FAILURE)
824+
if self.server_alive_timeouts > 0 =>
825+
{
826+
self.activity = false;
827+
// TODO what other things might need to happen in response to these two opcodes?
828+
self.server_alive_timeouts = 0;
829+
Ok((client, self))
830+
}
819831
_ => {
832+
self.activity = false;
820833
info!("Unhandled packet: {:?}", buf);
821834
Ok((client, self))
822835
}

russh/src/client/mod.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ use crate::session::{CommonSession, EncryptedState, Exchange, Kex, KexDhDone, Ke
105105
use crate::ssh_read::SshRead;
106106
use crate::sshbuffer::{SSHBuffer, SshId};
107107
use crate::{
108-
auth, msg, negotiation, strict_kex_violation, timeout, ChannelId, ChannelOpenFailure,
108+
auth, msg, negotiation, strict_kex_violation, ChannelId, ChannelOpenFailure,
109109
Disconnect, Limits, Sig,
110110
};
111111

@@ -128,6 +128,8 @@ pub struct Session {
128128
pending_len: u32,
129129
inbound_channel_sender: Sender<Msg>,
130130
inbound_channel_receiver: Receiver<Msg>,
131+
server_alive_timeouts: usize,
132+
activity: bool,
131133
}
132134

133135
const STRICT_KEX_MSG_ORDER: &[u8] = &[msg::KEXINIT, msg::KEX_ECDH_REPLY, msg::NEWKEYS];
@@ -728,6 +730,16 @@ async fn start_reading<R: AsyncRead + Unpin>(
728730
Ok((n, stream_read, buffer, cipher))
729731
}
730732

733+
fn future_or_pending<F: futures::Future, T>(
734+
val: Option<T>,
735+
f: impl FnOnce(T) -> F,
736+
) -> futures::future::Either<futures::future::Pending<<F as futures::Future>::Output>, F> {
737+
val.map_or(
738+
futures::future::Either::Left(futures::future::pending()),
739+
|x| futures::future::Either::Right(f(x)),
740+
)
741+
}
742+
731743
impl Session {
732744
fn new(
733745
target_window_size: u32,
@@ -746,6 +758,8 @@ impl Session {
746758
channels: HashMap::new(),
747759
pending_reads: Vec::new(),
748760
pending_len: 0,
761+
server_alive_timeouts: 0,
762+
activity: false,
749763
}
750764
}
751765

@@ -774,19 +788,32 @@ impl Session {
774788
let mut opening_cipher = Box::new(clear::Key) as Box<dyn OpeningKey + Send>;
775789
std::mem::swap(&mut opening_cipher, &mut self.common.cipher.remote_to_local);
776790

777-
let time_for_keepalive = tokio::time::sleep_until(self.common.config.keepalive_deadline());
791+
let keepalive_timer =
792+
future_or_pending(self.common.config.keepalive_interval, tokio::time::sleep);
793+
pin!(keepalive_timer);
794+
795+
let inactivity_timer =
796+
future_or_pending(self.common.config.inactivity_timeout, tokio::time::sleep);
797+
pin!(inactivity_timer);
798+
778799
let reading = start_reading(stream_read, buffer, opening_cipher);
779800
pin!(reading);
780-
pin!(time_for_keepalive);
781-
782-
let delay = self.common.config.inactivity_timeout;
783801

784802
#[allow(clippy::panic)] // false positive in select! macro
785803
while !self.common.disconnected {
804+
self.activity = false;
786805
tokio::select! {
787-
() = &mut time_for_keepalive => {
788-
time_for_keepalive.as_mut().reset(self.common.config.keepalive_deadline());
806+
() = &mut keepalive_timer => {
789807
self.send_keepalive(true);
808+
if self.common.config.keepalive_max != 0 && self.server_alive_timeouts > self.common.config.keepalive_max {
809+
debug!("Timeout, server not responding to keepalives");
810+
break
811+
}
812+
self.server_alive_timeouts = self.server_alive_timeouts.saturating_add(1);
813+
}
814+
() = &mut inactivity_timer => {
815+
debug!("timeout");
816+
break
790817
}
791818
r = &mut reading => {
792819
let (stream_read, mut buffer, mut opening_cipher) = match r {
@@ -819,6 +846,7 @@ impl Session {
819846
if buf[0] == crate::msg::DISCONNECT {
820847
break;
821848
} else {
849+
self.activity = true;
822850
let (h, s) = reply(self, handler, &mut encrypted_signal, &mut buffer.seqn, buf).await?;
823851
handler = h;
824852
self = s;
@@ -827,7 +855,6 @@ impl Session {
827855

828856
std::mem::swap(&mut opening_cipher, &mut self.common.cipher.remote_to_local);
829857
reading.set(start_reading(stream_read, buffer, opening_cipher));
830-
time_for_keepalive.as_mut().reset(self.common.config.keepalive_deadline());
831858
}
832859
msg = self.receiver.recv(), if !self.is_rekeying() => {
833860
match msg {
@@ -845,7 +872,6 @@ impl Session {
845872
Err(_) => break
846873
}
847874
}
848-
time_for_keepalive.as_mut().reset(self.common.config.keepalive_deadline());
849875
}
850876
msg = self.inbound_channel_receiver.recv(), if !self.is_rekeying() => {
851877
match msg {
@@ -861,17 +887,15 @@ impl Session {
861887
}
862888
}
863889
}
864-
_ = timeout(delay) => {
865-
debug!("timeout");
866-
break
867-
},
868-
}
890+
};
891+
869892
self.flush()?;
870893
if !self.common.write_buffer.buffer.is_empty() {
871894
trace!(
872895
"writing to stream: {:?} bytes",
873896
self.common.write_buffer.buffer.len()
874897
);
898+
self.activity = true;
875899
stream_write
876900
.write_all(&self.common.write_buffer.buffer)
877901
.await
@@ -885,6 +909,22 @@ impl Session {
885909
enc.state = EncryptedState::Authenticated;
886910
}
887911
}
912+
913+
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
914+
keepalive_timer.as_mut().as_pin_mut(),
915+
self.common.config.keepalive_interval,
916+
) {
917+
sleep.as_mut().reset(tokio::time::Instant::now() + d);
918+
}
919+
920+
if self.activity {
921+
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
922+
inactivity_timer.as_mut().as_pin_mut(),
923+
self.common.config.inactivity_timeout,
924+
) {
925+
sleep.as_mut().reset(tokio::time::Instant::now() + d);
926+
}
927+
}
888928
}
889929
debug!("disconnected");
890930
self.receiver.close();
@@ -1310,19 +1350,12 @@ pub struct Config {
13101350
pub inactivity_timeout: Option<std::time::Duration>,
13111351
/// If nothing is sent or received for this amount of time, send a keepalive message.
13121352
pub keepalive_interval: Option<std::time::Duration>,
1353+
/// If this many keepalives have been sent without reply, close the connection.
1354+
pub keepalive_max: usize,
13131355
/// Whether to expect and wait for an authentication call.
13141356
pub anonymous: bool,
13151357
}
13161358

1317-
impl Config {
1318-
fn keepalive_deadline(&self) -> tokio::time::Instant {
1319-
tokio::time::Instant::now()
1320-
+ self
1321-
.keepalive_interval
1322-
.unwrap_or(std::time::Duration::from_secs(86400 * 365))
1323-
}
1324-
}
1325-
13261359
impl Default for Config {
13271360
fn default() -> Config {
13281361
Config {
@@ -1337,6 +1370,7 @@ impl Default for Config {
13371370
preferred: Default::default(),
13381371
inactivity_timeout: None,
13391372
keepalive_interval: None,
1373+
keepalive_max: 3,
13401374
anonymous: false,
13411375
}
13421376
}

russh/src/client/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ impl Session {
292292
if let Some(ref mut enc) = self.common.encrypted {
293293
push_packet!(enc.write, {
294294
enc.write.push(msg::GLOBAL_REQUEST);
295-
enc.write.extend_ssh_string(b"keepalive@libssh2.org");
295+
enc.write.extend_ssh_string(b"keepalive@openssh.org");
296296
enc.write.push(want_reply as u8);
297297
});
298298
}

0 commit comments

Comments
 (0)