Skip to content

Commit 5c60d30

Browse files
amtelekomEugeny
authored andcommitted
Actually process global request results
1 parent a3d3129 commit 5c60d30

File tree

8 files changed

+248
-54
lines changed

8 files changed

+248
-54
lines changed

russh/src/client/encrypted.rs

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::client::{Handler, Msg, Prompt, Reply, Session};
2525
use crate::key::PubKey;
2626
use crate::negotiation::{Named, Select};
2727
use crate::parsing::{ChannelOpenConfirmation, ChannelType, OpenChannelMessage};
28-
use crate::session::{Encrypted, EncryptedState, Kex, KexInit};
28+
use crate::session::{Encrypted, EncryptedState, GlobalRequestResponse, Kex, KexInit};
2929
use crate::{
3030
auth, msg, negotiation, strict_kex_violation, Channel, ChannelId, ChannelMsg,
3131
ChannelOpenFailure, ChannelParams, Sig,
@@ -817,18 +817,51 @@ impl Session {
817817
}
818818
Some(&msg::REQUEST_SUCCESS) => {
819819
trace!("Global Request Success");
820+
match self.open_global_requests.pop_front() {
821+
Some(GlobalRequestResponse::Keepalive) => {
822+
// ignore keepalives
823+
}
824+
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
825+
let result = if buf.len() == 1 {
826+
// If a specific port was requested, the reply has no data
827+
Some(0)
828+
} else {
829+
let mut r = buf.reader(1);
830+
match r.read_u32() {
831+
Ok(port) => Some(port),
832+
Err(e) => {
833+
error!("Error parsing port for TcpIpForward request: {e:?}");
834+
None
835+
}
836+
}
837+
};
838+
let _ = return_channel.send(result);
839+
}
840+
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
841+
let _ = return_channel.send(true);
842+
}
843+
None => {
844+
error!("Received global request failure for unknown request!")
845+
}
846+
}
820847
Ok(())
821848
}
822849
Some(&msg::REQUEST_FAILURE) => {
823-
// Right now, the only global request we send with a request for reply is keepalive,
824-
// which just needs to be ignored.
825-
// If there are other global requests with reply implemented,
826-
// we'll need to build infrastructure to filter the expected request failures from the keepalive
827-
// The following works as long as only a single keepalive request was sent before a reply:
828-
// `if self.common.alive_timeouts > 0`
829-
// since any data received will reset alive_timeouts back to zero,
830-
// even if multiple keepalives will be processed due to TCP delivering all of them after connectivity was restored
831-
trace!("Global Request Failure");
850+
trace!("global request failure");
851+
match self.open_global_requests.pop_front() {
852+
Some(GlobalRequestResponse::Keepalive) => {
853+
// ignore keepalives
854+
}
855+
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
856+
let _ = return_channel.send(None);
857+
}
858+
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
859+
let _ = return_channel.send(false);
860+
}
861+
None => {
862+
error!("Received global request failure for unknown request!")
863+
}
864+
}
832865
Ok(())
833866
}
834867
m => {

russh/src/client/mod.rs

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
//! [Session]: client::Session
3636
3737
use std::cell::RefCell;
38-
use std::collections::HashMap;
38+
use std::collections::{HashMap, VecDeque};
3939
use std::num::Wrapping;
4040
use std::pin::Pin;
4141
use std::sync::Arc;
@@ -55,12 +55,15 @@ use tokio::pin;
5555
use tokio::sync::mpsc::{
5656
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
5757
};
58-
use tokio::sync::Mutex;
58+
use tokio::sync::{oneshot, Mutex};
5959

6060
use crate::channels::{Channel, ChannelMsg, ChannelRef};
6161
use crate::cipher::{self, clear, CipherPair, OpeningKey};
6262
use crate::key::PubKey;
63-
use crate::session::{CommonSession, EncryptedState, Exchange, Kex, KexDhDone, KexInit, NewKeys};
63+
use crate::session::{
64+
CommonSession, EncryptedState, Exchange, GlobalRequestResponse, Kex, KexDhDone, KexInit,
65+
NewKeys,
66+
};
6467
use crate::ssh_read::SshRead;
6568
use crate::sshbuffer::{SSHBuffer, SshId};
6669
use crate::{
@@ -87,6 +90,7 @@ pub struct Session {
8790
pending_len: u32,
8891
inbound_channel_sender: Sender<Msg>,
8992
inbound_channel_receiver: Receiver<Msg>,
93+
open_global_requests: VecDeque<GlobalRequestResponse>,
9094
}
9195

9296
const STRICT_KEX_MSG_ORDER: &[u8] = &[msg::KEXINIT, msg::KEX_ECDH_REPLY, msg::NEWKEYS];
@@ -146,12 +150,14 @@ pub enum Msg {
146150
channel_ref: ChannelRef,
147151
},
148152
TcpIpForward {
149-
want_reply: bool,
153+
/// Provide a channel for the reply result to request a reply from the server
154+
reply_channel: Option<oneshot::Sender<Option<u32>>>,
150155
address: String,
151156
port: u32,
152157
},
153158
CancelTcpIpForward {
154-
want_reply: bool,
159+
/// Provide a channel for the reply result to request a reply from the server
160+
reply_channel: Option<oneshot::Sender<bool>>,
155161
address: String,
156162
port: u32,
157163
},
@@ -507,39 +513,57 @@ impl<H: Handler> Handle<H> {
507513
.await
508514
}
509515

516+
/// Requests the server to open a TCP/IP forward channel
517+
///
518+
/// If port == 0 the server will choose a port that will be returned, returns 0 otherwise
510519
pub async fn tcpip_forward<A: Into<String>>(
511520
&mut self,
512521
address: A,
513522
port: u32,
514-
) -> Result<bool, crate::Error> {
523+
) -> Result<u32, crate::Error> {
524+
let (reply_send, reply_recv) = oneshot::channel();
515525
self.sender
516526
.send(Msg::TcpIpForward {
517-
want_reply: true,
527+
reply_channel: Some(reply_send),
518528
address: address.into(),
519529
port,
520530
})
521531
.await
522532
.map_err(|_| crate::Error::SendError)?;
523-
if port == 0 {
524-
self.wait_recv_reply().await?;
533+
534+
match reply_recv.await {
535+
Ok(Some(port)) => Ok(port),
536+
Ok(None) => Err(crate::Error::RequestDenied),
537+
Err(e) => {
538+
error!("Unable to receive TcpIpForward result: {e:?}");
539+
Err(crate::Error::Disconnect)
540+
}
525541
}
526-
Ok(true)
527542
}
528543

529544
pub async fn cancel_tcpip_forward<A: Into<String>>(
530545
&self,
531546
address: A,
532547
port: u32,
533-
) -> Result<bool, crate::Error> {
548+
) -> Result<(), crate::Error> {
549+
let (reply_send, reply_recv) = oneshot::channel();
534550
self.sender
535551
.send(Msg::CancelTcpIpForward {
536-
want_reply: true,
552+
reply_channel: Some(reply_send),
537553
address: address.into(),
538554
port,
539555
})
540556
.await
541557
.map_err(|_| crate::Error::SendError)?;
542-
Ok(true)
558+
559+
match reply_recv.await {
560+
Ok(true) => Ok(()),
561+
Ok(false) => Err(crate::Error::RequestDenied),
562+
Err(e) => {
563+
error!("Unable to receive CancelTcpIpForward result: {e:?}");
564+
Err(crate::Error::Disconnect)
565+
}
566+
}
543567
}
544568

545569
/// Sends a disconnect message.
@@ -707,6 +731,7 @@ impl Session {
707731
channels: HashMap::new(),
708732
pending_reads: Vec::new(),
709733
pending_len: 0,
734+
open_global_requests: VecDeque::new(),
710735
}
711736
}
712737

@@ -931,15 +956,15 @@ impl Session {
931956
self.channels.insert(id, channel_ref);
932957
}
933958
Msg::TcpIpForward {
934-
want_reply,
959+
reply_channel,
935960
address,
936961
port,
937-
} => self.tcpip_forward(want_reply, &address, port),
962+
} => self.tcpip_forward(reply_channel, &address, port),
938963
Msg::CancelTcpIpForward {
939-
want_reply,
964+
reply_channel,
940965
address,
941966
port,
942-
} => self.cancel_tcpip_forward(want_reply, &address, port),
967+
} => self.cancel_tcpip_forward(reply_channel, &address, port),
943968
Msg::Disconnect {
944969
reason,
945970
description,

russh/src/client/session.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use log::error;
22
use russh_cryptovec::CryptoVec;
33
use russh_keys::encoding::Encoding;
4+
use tokio::sync::oneshot;
45

56
use crate::client::Session;
67
use crate::session::EncryptedState;
@@ -264,8 +265,23 @@ impl Session {
264265
}
265266
}
266267

267-
pub fn tcpip_forward(&mut self, want_reply: bool, address: &str, port: u32) {
268+
/// Requests a TCP/IP forwarding from the server
269+
///
270+
/// If `reply_channel` is not None, sets want_reply and returns the server's response via the channel,
271+
/// Some<u32> for a success message with port, or None for failure
272+
pub fn tcpip_forward(
273+
&mut self,
274+
reply_channel: Option<oneshot::Sender<Option<u32>>>,
275+
address: &str,
276+
port: u32,
277+
) {
268278
if let Some(ref mut enc) = self.common.encrypted {
279+
let want_reply = reply_channel.is_some();
280+
if let Some(reply_channel) = reply_channel {
281+
self.open_global_requests.push_back(
282+
crate::session::GlobalRequestResponse::TcpIpForward(reply_channel),
283+
);
284+
}
269285
push_packet!(enc.write, {
270286
enc.write.push(msg::GLOBAL_REQUEST);
271287
enc.write.extend_ssh_string(b"tcpip-forward");
@@ -276,8 +292,23 @@ impl Session {
276292
}
277293
}
278294

279-
pub fn cancel_tcpip_forward(&mut self, want_reply: bool, address: &str, port: u32) {
295+
/// Requests cancellation of TCP/IP forwarding from the server
296+
///
297+
/// If `want_reply` is `true`, returns a oneshot receiveing the server's reply:
298+
/// `true` for a success message, or `false` for failure
299+
pub fn cancel_tcpip_forward(
300+
&mut self,
301+
reply_channel: Option<oneshot::Sender<bool>>,
302+
address: &str,
303+
port: u32,
304+
) {
280305
if let Some(ref mut enc) = self.common.encrypted {
306+
let want_reply = reply_channel.is_some();
307+
if let Some(reply_channel) = reply_channel {
308+
self.open_global_requests.push_back(
309+
crate::session::GlobalRequestResponse::CancelTcpIpForward(reply_channel),
310+
);
311+
}
281312
push_packet!(enc.write, {
282313
enc.write.push(msg::GLOBAL_REQUEST);
283314
enc.write.extend_ssh_string(b"cancel-tcpip-forward");
@@ -289,6 +320,8 @@ impl Session {
289320
}
290321

291322
pub fn send_keepalive(&mut self, want_reply: bool) {
323+
self.open_global_requests
324+
.push_back(crate::session::GlobalRequestResponse::Keepalive);
292325
if let Some(ref mut enc) = self.common.encrypted {
293326
push_packet!(enc.write, {
294327
enc.write.push(msg::GLOBAL_REQUEST);

russh/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,9 @@ pub enum Error {
262262
#[error("Failed to decrypt a packet")]
263263
DecryptionError,
264264

265+
#[error("The request was rejected by the other party")]
266+
RequestDenied,
267+
265268
#[error(transparent)]
266269
Keys(#[from] russh_keys::Error),
267270

russh/src/server/encrypted.rs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,18 +1061,51 @@ impl Session {
10611061
}
10621062
Some(&msg::REQUEST_SUCCESS) => {
10631063
trace!("Global Request Success");
1064+
match self.open_global_requests.pop_front() {
1065+
Some(GlobalRequestResponse::Keepalive) => {
1066+
// ignore keepalives
1067+
}
1068+
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
1069+
let result = if buf.len() == 1 {
1070+
// If a specific port was requested, the reply has no data
1071+
Some(0)
1072+
} else {
1073+
let mut r = buf.reader(1);
1074+
match r.read_u32() {
1075+
Ok(port) => Some(port),
1076+
Err(e) => {
1077+
error!("Error parsing port for TcpIpForward request: {e:?}");
1078+
None
1079+
}
1080+
}
1081+
};
1082+
let _ = return_channel.send(result);
1083+
}
1084+
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
1085+
let _ = return_channel.send(true);
1086+
}
1087+
None => {
1088+
error!("Received global request failure for unknown request!")
1089+
}
1090+
}
10641091
Ok(())
10651092
}
10661093
Some(&msg::REQUEST_FAILURE) => {
1067-
// Right now, the only global request we send with a request for reply is keepalive,
1068-
// which just needs to be ignored.
1069-
// If there are other global requests with reply implemented,
1070-
// we'll need to build infrastructure to filter the expected request failures from the keepalive
1071-
// The following works as long as only a single keepalive request was sent before a reply:
1072-
// `if self.common.alive_timeouts > 0`
1073-
// since any data received will reset alive_timeouts back to zero,
1074-
// even if multiple keepalives will be processed due to TCP delivering all of them after connectivity was restored
1075-
trace!("Global Request Failure");
1094+
trace!("global request failure");
1095+
match self.open_global_requests.pop_front() {
1096+
Some(GlobalRequestResponse::Keepalive) => {
1097+
// ignore keepalives
1098+
}
1099+
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
1100+
let _ = return_channel.send(None);
1101+
}
1102+
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
1103+
let _ = return_channel.send(false);
1104+
}
1105+
None => {
1106+
error!("Received global request failure for unknown request!")
1107+
}
1108+
}
10761109
Ok(())
10771110
}
10781111
m => {

russh/src/server/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
//! * Serving `ratatui` based TUI app to clients: [per-client](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_app.rs), [shared](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_shared_app.rs)
3030
3131
use std;
32-
use std::collections::HashMap;
32+
use std::collections::{HashMap, VecDeque};
3333
use std::num::Wrapping;
3434
use std::pin::Pin;
3535
use std::sync::Arc;
@@ -678,6 +678,7 @@ where
678678
pending_reads: Vec::new(),
679679
pending_len: 0,
680680
channels: HashMap::new(),
681+
open_global_requests: VecDeque::new(),
681682
};
682683
let join = tokio::spawn(session.run(stream, handler));
683684

0 commit comments

Comments
 (0)