Skip to content

Commit

Permalink
Changed keepalive to happen from internal proxy (#1841)
Browse files Browse the repository at this point in the history
* Changed keepalive to happen from internal proxy
Fixes cases where users set a breakpoint and agent dies meanwhile.

* ..
  • Loading branch information
aviramha committed Aug 23, 2023
1 parent 9dd5e5c commit f271475
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 43 deletions.
1 change: 1 addition & 0 deletions changelog.d/1839.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Changed keep alive to happen from internal proxy to support cases where layer process is stuck [breakpoint/etc]
28 changes: 26 additions & 2 deletions mirrord/cli/src/internal_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ use tokio::{
time::timeout,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, log::trace};
use tracing::{error, info, log::trace, warn};

use crate::error::{InternalProxyError, Result};

const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);

unsafe fn redirect_fd_to_dev_null(fd: libc::c_int) {
let devnull_fd = libc::open(b"/dev/null\0" as *const [u8; 10] as _, libc::O_RDWR);
libc::dup2(devnull_fd, fd);
Expand Down Expand Up @@ -73,6 +75,9 @@ fn print_port(listener: &TcpListener) -> Result<()> {

/// Supposed to run as an async detached task, proxying the connection.
/// We parse the protocol so we might add some logic here in the future?
/// We also handle pings here, meaning that if layer is too quiet (for example if it has a
/// breakpoint hit and someone is holding it) It will keep the agent alive and send pings on its
/// behalf.
async fn connection_task(config: LayerConfig, stream: TcpStream) {
let agent_connection = match connect_and_ping(&config).await {
Ok((agent_connection, _)) => agent_connection,
Expand All @@ -83,9 +88,13 @@ async fn connection_task(config: LayerConfig, stream: TcpStream) {
};
let mut layer_connection = actix_codec::Framed::new(stream, DaemonCodec::new());
let (agent_sender, mut agent_receiver) = agent_connection;
let mut ping = false;
let mut ping_interval = tokio::time::interval(PING_INTERVAL_DURATION);
ping_interval.tick().await;
loop {
select! {
layer_message = layer_connection.next() => {
ping_interval.reset();
match layer_message {
Some(Ok(layer_message)) => {
if let Err(err) = agent_sender.send(layer_message).await {
Expand All @@ -109,17 +118,32 @@ async fn connection_task(config: LayerConfig, stream: TcpStream) {
},
agent_message = agent_receiver.recv() => {
match agent_message {
Some(DaemonMessage::Pong) => {
ping = false;
},
Some(agent_message) => {
if let Err(err) = layer_connection.send(agent_message).await {
trace!("Error sending agent message to layer: {err:#?}");
break;
}
},
}
None => {
trace!("agent connection closed");
break;
}
}
},
_ = ping_interval.tick() => {
if !ping {
if let Err(err) = agent_sender.send(ClientMessage::Ping).await {
trace!("Error sending ping to agent: {err:#?}");
break;
}
ping = true;
} else {
warn!("Unmatched ping, timeout!");
break;
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions mirrord/layer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ pub(crate) enum LayerError {
)]
NewConnectionAfterSocketClose(ConnectionId),

#[error("mirrord-layer: Unmatched pong!")]
UnmatchedPong,

#[error("mirrord-layer: JSON convert error")]
JSONConvertError(#[from] serde_json::Error),

Expand Down
41 changes: 3 additions & 38 deletions mirrord/layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ use tokio::{
runtime::Runtime,
select,
sync::mpsc::{channel, Receiver, Sender},
time::{Duration, Interval},
};
use tracing::{debug, error, info, trace, warn};
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*};
Expand Down Expand Up @@ -569,15 +568,6 @@ struct Layer {
/// loop, where we receive the remote [`DaemonMessage`]s, and send them through it.
rx: Receiver<DaemonMessage>,

/// Part of the heartbeat mechanism of mirrord.
///
/// When it's been too long since we've last received a message (60 seconds), and this is
/// `false`, we send a [`ClientMessage::Ping`], set this to `true`, and expect to receive a
/// [`DaemonMessage::Pong`], setting it to `false` and completing the hearbeat detection.
ping: bool,

ping_interval: Interval,

/// Handler to the TCP mirror operations, see [`TcpMirrorHandler`].
tcp_mirror_handler: TcpMirrorHandler,

Expand Down Expand Up @@ -639,8 +629,6 @@ impl Layer {
Self {
tx,
rx,
ping: false,
ping_interval: tokio::time::interval(Duration::from_secs(30)),
tcp_mirror_handler: TcpMirrorHandler::new(port_mapping.clone()),
tcp_outgoing_handler: TcpOutgoingHandler::default(),
udp_outgoing_handler: Default::default(),
Expand All @@ -659,7 +647,6 @@ impl Layer {
/// Sends a [`ClientMessage`] through `Layer::tx` to the [`Receiver`] in
/// [`wrap_raw_connection`](mirrord_kube::api::wrap_raw_connection).
async fn send(&mut self, msg: ClientMessage) -> Result<(), ClientMessage> {
self.ping_interval.reset();
self.tx.send(msg).await.map_err(|err| err.0)
}

Expand Down Expand Up @@ -718,11 +705,6 @@ impl Layer {
///
/// ## Special case
///
/// ### [`DaemonMessage::Pong`]
///
/// This message has no dedicated handler, and is thus handled directly here, changing the
/// [`Self::ping`] state.
///
/// ### [`DaemonMessage::GetEnvVarsResponse`] and [`DaemonMessage::PauseTarget`]
///
/// Handled during mirrord-layer initialization, this message should never make it this far.
Expand Down Expand Up @@ -757,13 +739,9 @@ impl Layer {
.await
}
DaemonMessage::Pong => {
if self.ping {
self.ping = false;
trace!("Daemon sent pong!");
} else {
Err(LayerError::UnmatchedPong)?;
}

warn!(
"Received pong from agent - internal proxy should handle it, please report it."
);
Ok(())
}
DaemonMessage::GetEnvVarsResponse(_) => {
Expand Down Expand Up @@ -814,9 +792,6 @@ impl Layer {
///
/// - Read from the [`Layer`] feature handlers [`Receiver`]s, to turn outgoing messages into
/// [`ClientMessage`]s, sending them with `Layer::send`;
///
/// - Handle the heartbeat mechanism (Ping/Pong feature), sending a [`ClientMessage::Ping`] if all
/// the other channels received nothing for a while (30 seconds);
async fn thread_loop(
mut receiver: Receiver<HookMessage>,
tx: Sender<ClientMessage>,
Expand All @@ -832,7 +807,6 @@ async fn thread_loop(
..
} = config;
let mut layer = Layer::new(tx, rx, incoming);
layer.ping_interval.tick().await;

if let Err(err) = layer
.tx
Expand Down Expand Up @@ -898,15 +872,6 @@ async fn thread_loop(

layer.send(message).await.unwrap();
}
_ = layer.ping_interval.tick() => {
if !layer.ping {
layer.send(ClientMessage::Ping).await.unwrap();
trace!("sent ping to daemon");
layer.ping = true;
} else {
panic!("Client: unmatched ping");
}
}
}
}
graceful_exit!("mirrord has encountered an error and is now exiting.");
Expand Down

0 comments on commit f271475

Please sign in to comment.