Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: add more details to the status command #190212

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions cli/src/commands/tunnels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
code_server::CodeServerArgs,
create_service_manager,
dev_tunnels::{self, DevTunnels},
local_forwarding, legal,
legal, local_forwarding,
paths::get_all_servers,
protocol, serve_stream,
shutdown_signal::ShutdownRequest,
Expand Down Expand Up @@ -326,12 +326,12 @@ pub async fn kill(ctx: CommandContext) -> Result<i32, AnyError> {

#[derive(Serialize)]
pub struct StatusOutput {
pub tunnel: Option<protocol::singleton::TunnelState>,
pub tunnel: Option<protocol::singleton::StatusWithTunnelName>,
pub service_installed: bool,
}

pub async fn status(ctx: CommandContext) -> Result<i32, AnyError> {
let tunnel_status = do_single_rpc_call::<_, protocol::singleton::Status>(
let tunnel = do_single_rpc_call::<_, protocol::singleton::StatusWithTunnelName>(
&ctx.paths.tunnel_lockfile(),
ctx.log.clone(),
protocol::singleton::METHOD_STATUS,
Expand All @@ -347,8 +347,8 @@ pub async fn status(ctx: CommandContext) -> Result<i32, AnyError> {
ctx.log.result(
serde_json::to_string(&StatusOutput {
service_installed,
tunnel: match tunnel_status {
Ok(s) => Some(s.tunnel),
tunnel: match tunnel {
Ok(s) => Some(s),
Err(CodeError::NoRunningTunnel) => None,
Err(e) => return Err(e.into()),
},
Expand Down
49 changes: 47 additions & 2 deletions cli/src/tunnels/dev_tunnels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tunnels::management::{
NO_REQUEST_OPTIONS,
};

use super::protocol::PortPrivacy;
use super::protocol::{self, PortPrivacy};
use super::wsl_detect::is_wsl_installed;

static TUNNEL_COUNT_LIMIT_NAME: &str = "TunnelsPerUserPerLocation";
Expand Down Expand Up @@ -203,6 +203,11 @@ impl ActiveTunnel {
self.get_port_format()
.map(|f| f.replace(PORT_TOKEN, &port.to_string()))
}

/// Gets an object to read the current tunnel status.
pub fn status(&self) -> StatusLock {
self.manager.get_status()
}
}

const VSCODE_CLI_TUNNEL_TAG: &str = "vscode-server-launcher";
Expand Down Expand Up @@ -843,10 +848,36 @@ impl DevTunnels {
}
}

#[derive(Clone, Default)]
pub struct StatusLock(Arc<std::sync::Mutex<protocol::singleton::Status>>);

impl StatusLock {
fn succeed(&self) {
let mut status = self.0.lock().unwrap();
status.tunnel = protocol::singleton::TunnelState::Connected;
status.last_connected_at = Some(chrono::Utc::now());
}

fn fail(&self, reason: String) {
let mut status = self.0.lock().unwrap();
if let protocol::singleton::TunnelState::Connected = status.tunnel {
status.last_disconnected_at = Some(chrono::Utc::now());
status.tunnel = protocol::singleton::TunnelState::Disconnected;
}
status.last_fail_reason = Some(reason);
}

pub fn read(&self) -> protocol::singleton::Status {
let status = self.0.lock().unwrap();
status.clone()
}
}

struct ActiveTunnelManager {
close_tx: Option<mpsc::Sender<()>>,
endpoint_rx: watch::Receiver<Option<Result<TunnelRelayTunnelEndpoint, WrappedError>>>,
relay: Arc<tokio::sync::Mutex<RelayTunnelHost>>,
status: StatusLock,
}

impl ActiveTunnelManager {
Expand All @@ -862,13 +893,17 @@ impl ActiveTunnelManager {
let relay = Arc::new(tokio::sync::Mutex::new(RelayTunnelHost::new(locator, mgmt)));
let relay_spawned = relay.clone();

let status = StatusLock::default();

let status_spawned = status.clone();
tokio::spawn(async move {
ActiveTunnelManager::spawn_tunnel(
log,
relay_spawned,
close_rx,
endpoint_tx,
access_token,
status_spawned,
)
.await;
});
Expand All @@ -877,9 +912,15 @@ impl ActiveTunnelManager {
endpoint_rx,
relay,
close_tx: Some(close_tx),
status,
}
}

/// Gets a copy of the current tunnel status information
pub fn get_status(&self) -> StatusLock {
self.status.clone()
}

/// Adds a port for TCP/IP forwarding.
#[allow(dead_code)] // todo: port forwarding
pub async fn add_port_tcp(
Expand Down Expand Up @@ -967,12 +1008,15 @@ impl ActiveTunnelManager {
mut close_rx: mpsc::Receiver<()>,
endpoint_tx: watch::Sender<Option<Result<TunnelRelayTunnelEndpoint, WrappedError>>>,
access_token_provider: impl AccessTokenProvider + 'static,
status: StatusLock,
) {
let mut backoff = Backoff::new(Duration::from_secs(5), Duration::from_secs(120));

macro_rules! fail {
($e: expr, $msg: expr) => {
warning!(log, "{}: {}", $msg, $e);
let fmt = format!("{}: {}", $msg, $e);
warning!(log, &fmt);
status.fail(fmt);
endpoint_tx.send(Some(Err($e))).ok();
backoff.delay().await;
};
Expand Down Expand Up @@ -1008,6 +1052,7 @@ impl ActiveTunnelManager {
};

backoff.reset();
status.succeed();
endpoint_tx.send(Some(Ok(handle.endpoint().clone()))).ok();

tokio::select! {
Expand Down
31 changes: 28 additions & 3 deletions cli/src/tunnels/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub mod forward_singleton {

pub mod singleton {
use crate::log;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

pub const METHOD_RESTART: &str = "restart";
Expand All @@ -271,17 +272,41 @@ pub mod singleton {
pub message: String,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct StatusWithTunnelName {
pub name: Option<String>,
#[serde(flatten)]
pub status: Status,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct Status {
pub started_at: DateTime<Utc>,
pub tunnel: TunnelState,
pub last_connected_at: Option<DateTime<Utc>>,
pub last_disconnected_at: Option<DateTime<Utc>>,
pub last_fail_reason: Option<String>,
}

impl Default for Status {
fn default() -> Self {
Self {
started_at: Utc::now(),
tunnel: TunnelState::Disconnected,
last_connected_at: None,
last_disconnected_at: None,
last_fail_reason: None,
}
}
}

#[derive(Deserialize, Serialize, Debug)]
pub struct LogReplayFinished {}

#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize, Debug, Default, Clone)]
pub enum TunnelState {
#[default]
Disconnected,
Connected { name: String },
Connected,
}
}
12 changes: 7 additions & 5 deletions cli/src/tunnels/singleton_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,18 @@ pub async fn start_singleton_client(args: SingletonClientArgs) -> bool {
CONTROL_INSTRUCTIONS_COMMON
});

let res = c.caller.call::<_, _, protocol::singleton::Status>(
protocol::singleton::METHOD_STATUS,
protocol::EmptyObject {},
);
let res = c
.caller
.call::<_, _, protocol::singleton::StatusWithTunnelName>(
protocol::singleton::METHOD_STATUS,
protocol::EmptyObject {},
);

// we want to ensure the "listening" string always gets printed for
// consumers (i.e. VS Code). Ask for it. If the tunnel is not currently
// connected though, it will be soon, and that'll be in the log replays.
if let Ok(Ok(s)) = res.await {
if let protocol::singleton::TunnelState::Connected { name } = s.tunnel {
if let Some(name) = s.name {
print_listening(&c.log, &name);
}
}
Expand Down
44 changes: 30 additions & 14 deletions cli/src/tunnels/singleton_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use super::{
code_server::CodeServerArgs,
control_server::ServerTermination,
dev_tunnels::ActiveTunnel,
dev_tunnels::{ActiveTunnel, StatusLock},
protocol,
shutdown_signal::{ShutdownRequest, ShutdownSignal},
};
Expand Down Expand Up @@ -48,18 +48,28 @@ pub struct SingletonServerArgs<'a> {
pub log_broadcast: &'a BroadcastLogSink,
}

struct StatusInfo {
name: String,
lock: StatusLock,
}

#[derive(Clone)]
struct SingletonServerContext {
log: log::Logger,
shutdown_tx: broadcast::Sender<ShutdownSignal>,
broadcast_tx: broadcast::Sender<Vec<u8>>,
current_name: Arc<Mutex<Option<String>>>,
// ugly: a lock in a lock. current_status needs to be provided only
// after we set up the tunnel, however the tunnel is created after the
// singleton server starts to avoid a gap in singleton availability.
// However, this should be safe, as the lock is only used for immediate
// data reads (in the `status` method).
current_status: Arc<Mutex<Option<StatusInfo>>>,
}

pub struct RpcServer {
fut: JoinHandle<Result<(), CodeError>>,
shutdown_broadcast: broadcast::Sender<ShutdownSignal>,
current_name: Arc<Mutex<Option<String>>>,
current_status: Arc<Mutex<Option<StatusInfo>>>,
}

pub fn make_singleton_server(
Expand All @@ -71,12 +81,12 @@ pub fn make_singleton_server(
let (shutdown_broadcast, _) = broadcast::channel(4);
let rpc = new_json_rpc();

let current_name = Arc::new(Mutex::new(None));
let current_status = Arc::new(Mutex::default());
let mut rpc = rpc.methods(SingletonServerContext {
log: log.clone(),
shutdown_tx: shutdown_broadcast.clone(),
broadcast_tx: log_broadcast.get_brocaster(),
current_name: current_name.clone(),
current_status: current_status.clone(),
});

rpc.register_sync(
Expand All @@ -91,12 +101,15 @@ pub fn make_singleton_server(
rpc.register_sync(
protocol::singleton::METHOD_STATUS,
|_: protocol::EmptyObject, c| {
Ok(protocol::singleton::Status {
tunnel: match c.current_name.lock().unwrap().clone() {
Some(name) => protocol::singleton::TunnelState::Connected { name },
None => protocol::singleton::TunnelState::Disconnected,
},
})
Ok(c.current_status
.lock()
.unwrap()
.as_ref()
.map(|s| protocol::singleton::StatusWithTunnelName {
name: Some(s.name.clone()),
status: s.lock.read(),
})
.unwrap_or_default())
},
);

Expand Down Expand Up @@ -124,7 +137,7 @@ pub fn make_singleton_server(
});
RpcServer {
shutdown_broadcast,
current_name,
current_status,
fut,
}
}
Expand All @@ -139,8 +152,11 @@ pub async fn start_singleton_server<'a>(

{
print_listening(&args.log, &args.tunnel.name);
let mut name = args.server.current_name.lock().unwrap();
*name = Some(args.tunnel.name.clone())
let mut status = args.server.current_status.lock().unwrap();
*status = Some(StatusInfo {
name: args.tunnel.name.clone(),
lock: args.tunnel.status(),
})
}

let serve_fut = super::serve(
Expand Down