Skip to content

Commit

Permalink
Fix outgoing connect on bound sockets (#2440)
Browse files Browse the repository at this point in the history
* Integration test

* Fix case 1

* Fix case 2

* Changelog entry

* Fix CI workflow

* Doc + comment
  • Loading branch information
Razz4780 authored May 14, 2024
1 parent 3cefc44 commit 0b42ec9
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 29 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ jobs:
- run: |
cd mirrord/layer/tests/apps/issue2001
cargo build
- run: |
cd mirrord/layer/tests/apps/issue2438
cargo build
- run: ./scripts/build_c_apps.sh
- run: cargo test --target x86_64-unknown-linux-gnu -p mirrord-layer
- name: mirrord protocol UT
Expand Down Expand Up @@ -408,6 +411,9 @@ jobs:
- run: |
cd mirrord/layer/tests/apps/issue2001
cargo build
- run: |
cd mirrord/layer/tests/apps/issue2438
cargo build
- run: ./scripts/build_c_apps.sh
# For the `java_temurin_sip` test.
- uses: sdkman/sdkman-action@b1f9b696c79148b66d3d3a06f7ea801820318d0f
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"mirrord/layer/tests/apps/issue1776portnot53",
"mirrord/layer/tests/apps/issue1899",
"mirrord/layer/tests/apps/issue2001",
"mirrord/layer/tests/apps/issue2438",
"sample/rust",
"medschool",
"tests",
Expand Down
1 change: 1 addition & 0 deletions changelog.d/2438.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug where outgoing connections where not intercepted from bound sockets.
3 changes: 0 additions & 3 deletions mirrord/layer/src/detour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ pub(crate) enum Bypass {
/// is not exposed by a service, so bind locally.
BindWhenTargetless,

/// Hooked connect from a bound mirror socket.
MirrorConnect,

/// Hooked a `connect` to a target that is disabled in the configuration.
DisabledOutgoing,

Expand Down
51 changes: 26 additions & 25 deletions mirrord/layer/src/socket/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use mirrord_protocol::{
dns::{GetAddrInfoRequest, LookupRecord},
file::{OpenFileResponse, OpenOptionsInternal, ReadFileResponse},
};
use nix::sys::socket::{sockopt, SockaddrLike, SockaddrStorage};
use socket2::SockAddr;
use tracing::{error, trace};

Expand Down Expand Up @@ -202,8 +203,9 @@ fn is_ignored_tcp_port(addr: &SocketAddr, config: &IncomingConfig) -> bool {
is_ignored_port(addr) || (not_stolen_with_filter && not_whitelisted)
}

/// Check if the socket is managed by us, if it's managed by us and it's not an ignored port,
/// update the socket state.
/// If the socket is not found in [`SOCKETS`], bypass.
/// Otherwise, if it's not an ignored port, bind (possibly with a fallback to random port) and
/// update socket state in [`SOCKETS`]. If it's an ignored port, remove the socket from [`SOCKETS`].
#[mirrord_layer_macro::instrument(level = "trace", ret, skip(raw_address))]
pub(super) fn bind(
sockfd: c_int,
Expand Down Expand Up @@ -538,10 +540,27 @@ pub(super) fn connect(

trace!("in connect {:#?}", SOCKETS);

let (_, user_socket_info) = {
SOCKETS
.remove(&sockfd)
.ok_or(Bypass::LocalFdNotFound(sockfd))?
let user_socket_info = match SOCKETS.remove(&sockfd) {
Some((_, socket)) => socket,
None => {
// Socket was probably removed from `SOCKETS` in `bind` detour (as not interesting in
// terms of `incoming` feature).
// Here we just recreate `UserSocket` using domain and type fetched from the descriptor
// we have.
let domain = nix::sys::socket::getsockname::<SockaddrStorage>(sockfd)
.map_err(io::Error::from)?
.family()
.map(|family| family as i32)
.unwrap_or(-1);
if domain != libc::AF_INET && domain != libc::AF_UNIX {
return Detour::Bypass(Bypass::Domain(domain));
}
let type_ = nix::sys::socket::getsockopt(sockfd, sockopt::SockType)
.map_err(io::Error::from)? as i32;
let kind = SocketKind::try_from(type_)?;

Arc::new(UserSocket::new(domain, type_, 0, Default::default(), kind))
}
};

if let Some(ip_address) = optional_ip_address {
Expand Down Expand Up @@ -598,7 +617,7 @@ pub(super) fn connect(
),

NetProtocol::Stream => match user_socket_info.state {
SocketState::Initialized
SocketState::Initialized | SocketState::Bound(..)
if (optional_ip_address.is_some() && enabled_tcp_outgoing)
|| (remote_address.is_unix() && !unix_streams.is_empty()) =>
{
Expand All @@ -610,24 +629,6 @@ pub(super) fn connect(
)
}

SocketState::Bound(Bound { address, .. }) => {
trace!("connect -> SocketState::Bound {:#?}", user_socket_info);

let address = SockAddr::from(address);
let bind_result = unsafe { FN_BIND(sockfd, address.as_ptr(), address.len()) };

if bind_result != 0 {
error!(
"connect -> Failed to bind socket result {:?}, address: {:?}, sockfd: {:?}!",
bind_result, address, sockfd
);

Err(io::Error::last_os_error())?
} else {
Detour::Bypass(Bypass::MirrorConnect)
}
}

_ => Detour::Bypass(Bypass::DisabledOutgoing),
},

Expand Down
7 changes: 7 additions & 0 deletions mirrord/layer/tests/apps/issue2438/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[package]
name = "issue2438"
version = "0.1.0"
edition = "2021"

[dependencies]
nix.workspace = true
49 changes: 49 additions & 0 deletions mirrord/layer/tests/apps/issue2438/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! Attempts to connect to in-cluster peer `1.1.1.1:4567` from bound TCP sockets. Tests 2 cases:
//! 1. Socket is first bound to an address `0.0.0.0:0`, that triggers a bypass in `bind` detour
//! (socket **could** later be used for port subscription)
//! 2. Socket is first bound to an address `0.0.0.0:80`, that goes through `bind` detour completely
//! (socket **could not** later be used for port subscription)

use std::{
io::{Read, Write},
net::{SocketAddr, TcpStream},
os::fd::FromRawFd,
};

use nix::sys::socket::{self, AddressFamily, SockFlag, SockType, SockaddrStorage};
fn main() {
let bind_addresses: [SocketAddr; 2] =
["0.0.0.0:0".parse().unwrap(), "0.0.0.0:80".parse().unwrap()];

let peer_address = "1.1.1.1:4567".parse::<SocketAddr>().unwrap();

for bind_address in bind_addresses {
let sockfd = socket::socket(
AddressFamily::Inet,
SockType::Stream,
SockFlag::empty(),
None,
)
.unwrap();
println!("SOCKET CREATED: {sockfd}");

socket::bind(sockfd, &SockaddrStorage::from(bind_address)).unwrap();
println!("SOCKET BOUND TO {bind_address}");

socket::connect(sockfd, &SockaddrStorage::from(peer_address)).unwrap();
println!("SOCKET CONNECTED TO {peer_address}");

let mut stream = unsafe { TcpStream::from_raw_fd(sockfd) };
assert_eq!(stream.peer_addr().unwrap(), peer_address);
println!("`TcpStream::peer_addr()` RESULT AS EXPECTED");

let message = b"hello there";
let bytes_written = stream.write(message).unwrap();
assert_eq!(bytes_written, message.len(), "partial write");

let mut buf = vec![];
stream.read_to_end(&mut buf).unwrap();
assert_eq!(buf.as_slice(), message);
println!("RECEIVED ECHO");
}
}
12 changes: 11 additions & 1 deletion mirrord/layer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ pub enum Application {
Realpath,
NodeIssue2283,
RustIssue2204,
RustIssue2438,
// For running applications with the executable and arguments determined at runtime.
DynamicApp(String, Vec<String>),
}
Expand Down Expand Up @@ -781,6 +782,13 @@ impl Application {
"../../target/debug/issue1899"
)
}
Application::RustIssue2438 => {
format!(
"{}/{}",
env!("CARGO_MANIFEST_DIR"),
"../../target/debug/issue2438"
)
}
Application::RustIssue2001 => {
format!(
"{}/{}",
Expand Down Expand Up @@ -902,7 +910,8 @@ impl Application {
| Application::OpenFile
| Application::CIssue2055
| Application::CIssue2178
| Application::RustIssue2204 => vec![],
| Application::RustIssue2204
| Application::RustIssue2438 => vec![],
Application::RustOutgoingUdp => ["--udp", RUST_OUTGOING_LOCAL, RUST_OUTGOING_PEERS]
.into_iter()
.map(Into::into)
Expand Down Expand Up @@ -973,6 +982,7 @@ impl Application {
| Application::CIssue2178
| Application::NodeIssue2283
| Application::RustIssue2204
| Application::RustIssue2438
| Application::DynamicApp(..) => unimplemented!("shouldn't get here"),
Application::PythonSelfConnect => 1337,
Application::RustIssue2058 => 1234,
Expand Down
59 changes: 59 additions & 0 deletions mirrord/layer/tests/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,62 @@ async fn outgoing_tcp_from_the_local_app_broken(
) {
outgoing_tcp_logic(with_config, dylib_path, config_dir).await;
}

/// Tests that outgoing connections are properly handled on sockets that were bound by the user
/// application.
#[rstest]
#[tokio::test]
#[timeout(Duration::from_secs(10))]
async fn outgoing_tcp_bound_socket(dylib_path: &PathBuf) {
let (mut test_process, mut intproxy) = Application::RustIssue2438
.start_process_with_layer(dylib_path, vec![], None)
.await;

let expected_peer_address = "1.1.1.1:4567".parse::<SocketAddr>().unwrap();

// Test apps runs logic twice for 2 bind addresses.
for _ in 0..2 {
let msg = intproxy.recv().await;
let ClientMessage::TcpOutgoing(LayerTcpOutgoing::Connect(LayerConnect {
remote_address: SocketAddress::Ip(addr),
})) = msg
else {
panic!("Invalid message received from layer: {msg:?}");
};
assert_eq!(addr, expected_peer_address);

intproxy
.send(DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Connect(Ok(
DaemonConnect {
connection_id: 0,
remote_address: addr.into(),
local_address: "1.2.3.4:6000".parse::<SocketAddr>().unwrap().into(),
},
))))
.await;

let msg = intproxy.recv().await;
let ClientMessage::TcpOutgoing(LayerTcpOutgoing::Write(LayerWrite {
connection_id: 0,
bytes,
})) = msg
else {
panic!("Invalid message received from layer: {msg:?}");
};

intproxy
.send(DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Read(Ok(
DaemonRead {
connection_id: 0,
bytes,
},
))))
.await;

intproxy
.send(DaemonMessage::TcpOutgoing(DaemonTcpOutgoing::Close(0)))
.await;
}

test_process.wait_assert_success().await;
}

0 comments on commit 0b42ec9

Please sign in to comment.