Skip to content

Commit

Permalink
Fixed issue connecting to ephemeral container when target is in diffe…
Browse files Browse the repository at this point in the history
…rent namespace (#1846)

* Fixed issue connecting to ephemeral container when target is in different namespace

* ..

* ..

* ..
  • Loading branch information
aviramha committed Aug 24, 2023
1 parent 0e3ceef commit 6c4fea4
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 128 deletions.
1 change: 1 addition & 0 deletions changelog.d/+ephemeral-namespace-different.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed issue connecting to ephemeral container when target is in different namespace
16 changes: 0 additions & 16 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,6 @@
}
]
},
"connect_agent_name": {
"description": "<!--${internal}-->\n\n## connect_agent_name {#root-connect_agent_name}\n\nAgent name that already exists that we can connect to.\n\nKeep in mind that the intention here is to allow reusing a long living mirrord-agent pod, and **not** to connect multiple (simultaneos) mirrord instances to a single mirrord-agent, as the later is not properly supported without the use of [mirrord-operator](https://metalbear.co/#waitlist-form).\n\n```json { \"connect_agent_name\": \"mirrord-agent-still-alive\" } ```",
"type": [
"string",
"null"
]
},
"connect_agent_port": {
"description": "<!--${internal}-->\n\n## connect_agent_port {#root-connect_agent_port}\n\nAgent listen port that already exists that we can connect to.\n\n```json { \"connect_agent_port\": \"8888\" } ```",
"type": [
"integer",
"null"
],
"format": "uint16",
"minimum": 0.0
},
"connect_tcp": {
"title": "connect_tcp {#root-connect_tpc}",
"description": "IP:PORT to connect to instead of using k8s api, for testing purposes.\n\n```json { \"connect_tcp\": \"10.10.0.100:7777\" } ```",
Expand Down
33 changes: 28 additions & 5 deletions mirrord/cli/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,44 @@
use std::time::Duration;

use mirrord_config::{feature::network::outgoing::OutgoingFilterConfig, LayerConfig};
use mirrord_kube::api::{kubernetes::KubernetesAPI, AgentManagment};
use mirrord_kube::api::{
kubernetes::{AgentKubernetesConnectInfo, KubernetesAPI},
AgentManagment,
};
use mirrord_operator::client::{OperatorApi, OperatorApiError, OperatorSessionInformation};
use mirrord_progress::Progress;
use mirrord_protocol::{ClientMessage, DaemonMessage};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::trace;

use crate::{CliError, Result};

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum AgentConnectInfo {
Operator(OperatorSessionInformation),
/// Connect directly to an agent by name and port using k8s port forward.
DirectKubernetes(String, u16),
DirectKubernetes(AgentKubernetesConnectInfo),
}

const AGENT_CONNECT_INFO_KEY: &str = "MIRRORD_AGENT_CONNECT_INFO";

impl AgentConnectInfo {
/// Returns environment variable holding the information
pub const fn env_key() -> &'static str {
AGENT_CONNECT_INFO_KEY
}

/// Loads the information from environment variables
pub fn from_env() -> Result<Option<Self>> {
std::env::var(Self::env_key())
.ok()
.map(|val| {
serde_json::from_str(&val).map_err(|e| CliError::ConnectInfoLoadFailed(val, e))
})
.transpose()
}
}
pub(crate) struct AgentConnection {
pub sender: mpsc::Sender<ClientMessage>,
pub receiver: mpsc::Receiver<DaemonMessage>,
Expand Down Expand Up @@ -108,7 +131,7 @@ where
detect_openshift_task.warning("couldn't determine OpenShift");
};

let (pod_agent_name, agent_port) = tokio::time::timeout(
let agent_connect_info = tokio::time::timeout(
Duration::from_secs(config.agent.startup_timeout),
k8s_api.create_agent(progress),
)
Expand All @@ -117,12 +140,12 @@ where
.map_err(CliError::CreateAgentFailed)?;

let (sender, receiver) = k8s_api
.create_connection((pod_agent_name.clone(), agent_port))
.create_connection(agent_connect_info.clone())
.await
.map_err(CliError::AgentConnectionFailed)?;

Ok((
AgentConnectInfo::DirectKubernetes(pod_agent_name, agent_port),
AgentConnectInfo::DirectKubernetes(agent_connect_info),
AgentConnection { sender, receiver },
))
}
Expand Down
5 changes: 5 additions & 0 deletions mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,9 @@ pub(crate) enum CliError {
#[error("Waitlist registration failed.")]
#[diagnostic(help("Please check the email provided and internet connection.{GENERAL_HELP}"))]
WaitlistError(reqwest::Error),
#[error("Connection info deserialization failed: please report it. value: `{0}` err: `{1}`")]
#[diagnostic(help(
r#"This is a bug. Please report it in our Discord or GitHub repository. {GENERAL_HELP}"#
))]
ConnectInfoLoadFailed(String, serde_json::Error),
}
15 changes: 2 additions & 13 deletions mirrord/cli/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
};

use mirrord_config::LayerConfig;
use mirrord_operator::client::OperatorSessionInformation;
use mirrord_progress::Progress;
use mirrord_protocol::{ClientMessage, DaemonMessage, EnvVars, GetEnvVarsRequest};
#[cfg(target_os = "macos")]
Expand Down Expand Up @@ -122,18 +121,8 @@ impl MirrordExecution {
.stderr(std::process::Stdio::null())
.stdin(std::process::Stdio::null());

match &connect_info {
AgentConnectInfo::DirectKubernetes(name, port) => {
proxy_command.env("MIRRORD_CONNECT_AGENT", name);
proxy_command.env("MIRRORD_CONNECT_PORT", port.to_string());
}
AgentConnectInfo::Operator(session) => {
proxy_command.env(
OperatorSessionInformation::env_key(),
serde_json::to_string(&session)?,
);
}
};
let connect_info = serde_json::to_string(&connect_info)?;
proxy_command.env(AgentConnectInfo::env_key(), connect_info);

let mut proxy_process = proxy_command
.spawn()
Expand Down
42 changes: 23 additions & 19 deletions mirrord/cli/src/internal_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::{error, info, log::trace, warn};

use crate::error::{InternalProxyError, Result};
use crate::{
connection::AgentConnectInfo,
error::{InternalProxyError, Result},
};

const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -400,25 +403,26 @@ async fn connect(
(mpsc::Sender<ClientMessage>, mpsc::Receiver<DaemonMessage>),
Option<OperatorSessionInformation>,
)> {
if let Some(operator_session_information) = OperatorSessionInformation::from_env()? {
Ok((
let agent_connect_info = AgentConnectInfo::from_env()?;
match agent_connect_info {
Some(AgentConnectInfo::Operator(operator_session_information)) => Ok((
OperatorApi::connect(config, &operator_session_information).await?,
Some(operator_session_information),
))
} else if let Some(address) = &config.connect_tcp {
let stream = TcpStream::connect(address)
.await
.map_err(InternalProxyError::TcpConnectError)?;
Ok((wrap_raw_connection(stream), None))
} else if let (Some(agent_name), Some(port)) =
(&config.connect_agent_name, config.connect_agent_port)
{
let k8s_api = KubernetesAPI::create(config).await?;
let connection = k8s_api
.create_connection((agent_name.clone(), port))
.await?;
Ok((connection, None))
} else {
Err(InternalProxyError::NoConnectionMethod.into())
)),
Some(AgentConnectInfo::DirectKubernetes(connect_info)) => {
let k8s_api = KubernetesAPI::create(config).await?;
let connection = k8s_api.create_connection(connect_info).await?;
Ok((connection, None))
}
None => {
if let Some(address) = &config.connect_tcp {
let stream = TcpStream::connect(address)
.await
.map_err(InternalProxyError::TcpConnectError)?;
Ok((wrap_raw_connection(stream), None))
} else {
Err(InternalProxyError::NoConnectionMethod.into())
}
}
}
}
35 changes: 0 additions & 35 deletions mirrord/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,39 +201,6 @@ pub struct LayerConfig {
#[config(env = "MIRRORD_CONNECT_TCP")]
pub connect_tcp: Option<String>,

/// <!--${internal}-->
///
/// ## connect_agent_name {#root-connect_agent_name}
///
/// Agent name that already exists that we can connect to.
///
/// Keep in mind that the intention here is to allow reusing a long living mirrord-agent pod,
/// and **not** to connect multiple (simultaneos) mirrord instances to a single
/// mirrord-agent, as the later is not properly supported without the use of
/// [mirrord-operator](https://metalbear.co/#waitlist-form).
///
/// ```json
/// {
/// "connect_agent_name": "mirrord-agent-still-alive"
/// }
/// ```
#[config(env = "MIRRORD_CONNECT_AGENT")]
pub connect_agent_name: Option<String>,

/// <!--${internal}-->
///
/// ## connect_agent_port {#root-connect_agent_port}
///
/// Agent listen port that already exists that we can connect to.
///
/// ```json
/// {
/// "connect_agent_port": "8888"
/// }
/// ```
#[config(env = "MIRRORD_CONNECT_PORT")]
pub connect_agent_port: Option<u16>,

/// ## operator {#root-operator}
///
/// Allow to lookup if operator is installed on cluster and use it.
Expand Down Expand Up @@ -624,8 +591,6 @@ mod tests {
pause: Some(false),
kubeconfig: None,
telemetry: None,
connect_agent_name: None,
connect_agent_port: None,
target: Some(TargetFileConfig::Advanced {
path: Some(Target::Pod(PodTarget {
pod: "test-service-abcdefg-abcd".to_owned(),
Expand Down
19 changes: 14 additions & 5 deletions mirrord/kube/src/api/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde_json::json;
use tokio::pin;
use tracing::{debug, warn};

use super::kubernetes::AgentKubernetesConnectInfo;
use crate::{
api::{
get_k8s_resource_api,
Expand Down Expand Up @@ -52,7 +53,7 @@ pub trait ContainerApi {
connection_port: u16,
progress: &P,
agent_gid: u16,
) -> Result<String>
) -> Result<AgentKubernetesConnectInfo>
where
P: Progress + Send + Sync;
}
Expand Down Expand Up @@ -174,7 +175,7 @@ impl ContainerApi for JobContainer {
connection_port: u16,
progress: &P,
agent_gid: u16,
) -> Result<String>
) -> Result<AgentKubernetesConnectInfo>
where
P: Progress + Send + Sync,
{
Expand Down Expand Up @@ -378,7 +379,11 @@ impl ContainerApi for JobContainer {

pod_progress.success(Some("pod is ready"));

Ok(pod_name)
Ok(AgentKubernetesConnectInfo {
pod_name,
agent_port: connection_port,
namespace: agent.namespace.clone(),
})
}
}

Expand All @@ -393,7 +398,7 @@ impl ContainerApi for EphemeralContainer {
connection_port: u16,
progress: &P,
agent_gid: u16,
) -> Result<String>
) -> Result<AgentKubernetesConnectInfo>
where
P: Progress + Send + Sync,
{
Expand Down Expand Up @@ -524,7 +529,11 @@ impl ContainerApi for EphemeralContainer {
container_progress.success(Some("container is ready"));

debug!("container is ready");
Ok(runtime_data.pod_name.to_string())
Ok(AgentKubernetesConnectInfo {
pod_name: runtime_data.pod_name.to_string(),
agent_port: connection_port,
namespace: runtime_data.pod_namespace.clone(),
})
}
}

Expand Down
Loading

0 comments on commit 6c4fea4

Please sign in to comment.