Skip to content

Commit

Permalink
Record TCP connection local socket address in metadata (#3286)
Browse files Browse the repository at this point in the history
## Motivation and Context
I want to use this field to uniquely identify TCP connection based on
their `local_addr` + `remote_addr`.

See awslabs/aws-sdk-rust#990 for additional
motivation for this change.

## Description
- Add a new optional `local_addr` field in the `ConnectionMetadata`
struct.
- Transfer the `local_addr` `SocketAddress` from the `hyper::HttpInfo`
to the `ConnectionMetadata` field.
- Add to the `trace-serialize` example program so that it will print out
the capture connection values.

## Testing
`cargo test` in `rust-runtime/aws-smithy-runtime-api` and
`aws-smithy-runtime`.

Also ran:
```
thedeck@c889f3b04fb0 examples % cargo run --example trace-serialize
    Finished dev [unoptimized + debuginfo] target(s) in 0.13s
     Running `/Users/thedeck/repos/github/declanvk/smithy-rs/target/debug/examples/trace-serialize`
2023-12-06T00:13:15.605555Z  INFO lazy_load_identity: aws_smithy_runtime::client::identity::cache::lazy: identity cache miss occurred; added new identity (took Ok(296µs))
2023-12-06T00:13:15.608344Z  INFO trace_serialize: Response received: response=Response { status: StatusCode(200), headers: Headers { headers: {"content-type": HeaderValue { _private: "application/json" }, "content-length": HeaderValue { _private: "17" }, "date": HeaderValue { _private: "Wed, 06 Dec 2023 00:13:15 GMT" }} }, body: SdkBody { inner: BoxBody, retryable: false }, extensions: Extensions }
2023-12-06T00:13:15.608388Z  INFO trace_serialize: Captured connection info remote_addr=Some(127.0.0.1:13734) local_addr=Some(127.0.0.1:50199)
2023-12-06T00:13:15.608511Z  INFO trace_serialize: Response received POKEMON_SERVICE_URL=http://localhost:13734 response=GetServerStatisticsOutput { calls_count: 0 }
```

You can see the log line with "Captured connection info" contains the
`remote_addr` and the `local_addr` fields.

## Checklist
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

Co-authored-by: Declan Kelly <thedeck@amazon.com>
  • Loading branch information
declanvk and Declan Kelly committed Dec 6, 2023
1 parent 8df5ac8 commit b78367c
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ message = "Fix documentation and examples on HyperConnector and HyperClientBuild
references = ["smithy-rs#3282"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "jdisanti"

[[smithy-rs]]
message = "Expose local socket address from ConnectionMetadata."
references = ["aws-sdk-rust#990"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "declanvk"
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use aws_smithy_runtime::client::http::connection_poisoning::CaptureSmithyConnection;
/// This example demonstrates how an interceptor can be written to trace what is being
/// serialized / deserialized on the wire.
///
Expand Down Expand Up @@ -59,7 +60,7 @@ impl Intercept for WireFormatInterceptor {
&self,
context: &BeforeDeserializationInterceptorContextRef<'_>,
_runtime_components: &RuntimeComponents,
_cfg: &mut ConfigBag,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
// Get the response type from the context.
let response = context.response();
Expand All @@ -70,6 +71,18 @@ impl Intercept for WireFormatInterceptor {
tracing::error!(?response);
}

// Print the connection information
let captured_connection = cfg.load::<CaptureSmithyConnection>().cloned();
if let Some(captured_connection) = captured_connection.and_then(|conn| conn.get()) {
tracing::info!(
remote_addr = ?captured_connection.remote_addr(),
local_addr = ?captured_connection.local_addr(),
"Captured connection info"
);
} else {
tracing::warn!("Connection info is missing!");
}

Ok(())
}
}
Expand Down
204 changes: 204 additions & 0 deletions rust-runtime/aws-smithy-runtime-api/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
pub struct ConnectionMetadata {
is_proxied: bool,
remote_addr: Option<SocketAddr>,
local_addr: Option<SocketAddr>,
poison_fn: Arc<dyn Fn() + Send + Sync>,
}

Expand All @@ -25,6 +26,10 @@ impl ConnectionMetadata {
}

/// Create a new [`ConnectionMetadata`].
#[deprecated(
since = "1.1.0",
note = "`ConnectionMetadata::new` is deprecated in favour of `ConnectionMetadata::builder`."
)]
pub fn new(
is_proxied: bool,
remote_addr: Option<SocketAddr>,
Expand All @@ -33,21 +38,220 @@ impl ConnectionMetadata {
Self {
is_proxied,
remote_addr,
// need to use builder to set this field
local_addr: None,
poison_fn: Arc::new(poison),
}
}

/// Builder for this connection metadata
pub fn builder() -> ConnectionMetadataBuilder {
ConnectionMetadataBuilder::new()
}

/// Get the remote address for this connection, if one is set.
pub fn remote_addr(&self) -> Option<SocketAddr> {
self.remote_addr
}

/// Get the local address for this connection, if one is set.
pub fn local_addr(&self) -> Option<SocketAddr> {
self.local_addr
}
}

impl Debug for ConnectionMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SmithyConnection")
.field("is_proxied", &self.is_proxied)
.field("remote_addr", &self.remote_addr)
.field("local_addr", &self.local_addr)
.finish()
}
}

/// Builder type that is used to construct a [`ConnectionMetadata`] value.
#[derive(Default)]
pub struct ConnectionMetadataBuilder {
is_proxied: Option<bool>,
remote_addr: Option<SocketAddr>,
local_addr: Option<SocketAddr>,
poison_fn: Option<Arc<dyn Fn() + Send + Sync>>,
}

impl Debug for ConnectionMetadataBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionMetadataBuilder")
.field("is_proxied", &self.is_proxied)
.field("remote_addr", &self.remote_addr)
.field("local_addr", &self.local_addr)
.finish()
}
}

impl ConnectionMetadataBuilder {
/// Creates a new builder.
pub fn new() -> Self {
Self::default()
}

/// Set whether or not the associated connection is to an HTTP proxy.
pub fn proxied(mut self, proxied: bool) -> Self {
self.set_proxied(Some(proxied));
self
}

/// Set whether or not the associated connection is to an HTTP proxy.
pub fn set_proxied(&mut self, proxied: Option<bool>) -> &mut Self {
self.is_proxied = proxied;
self
}

/// Set the remote address of the connection used.
pub fn remote_addr(mut self, remote_addr: SocketAddr) -> Self {
self.set_remote_addr(Some(remote_addr));
self
}

/// Set the remote address of the connection used.
pub fn set_remote_addr(&mut self, remote_addr: Option<SocketAddr>) -> &mut Self {
self.remote_addr = remote_addr;
self
}

/// Set the local address of the connection used.
pub fn local_addr(mut self, local_addr: SocketAddr) -> Self {
self.set_local_addr(Some(local_addr));
self
}

/// Set the local address of the connection used.
pub fn set_local_addr(&mut self, local_addr: Option<SocketAddr>) -> &mut Self {
self.local_addr = local_addr;
self
}

/// Set a closure which will poison the associated connection.
///
/// A poisoned connection will not be reused for subsequent requests by the pool
pub fn poison_fn(mut self, poison_fn: impl Fn() + Send + Sync + 'static) -> Self {
self.set_poison_fn(Some(poison_fn));
self
}

/// Set a closure which will poison the associated connection.
///
/// A poisoned connection will not be reused for subsequent requests by the pool
pub fn set_poison_fn(
&mut self,
poison_fn: Option<impl Fn() + Send + Sync + 'static>,
) -> &mut Self {
self.poison_fn =
poison_fn.map(|poison_fn| Arc::new(poison_fn) as Arc<dyn Fn() + Send + Sync>);
self
}

/// Build a [`ConnectionMetadata`] value.
///
/// # Panics
///
/// If either the `is_proxied` or `poison_fn` has not been set, then this method will panic
pub fn build(self) -> ConnectionMetadata {
ConnectionMetadata {
is_proxied: self
.is_proxied
.expect("is_proxied should be set for ConnectionMetadata"),
remote_addr: self.remote_addr,
local_addr: self.local_addr,
poison_fn: self
.poison_fn
.expect("poison_fn should be set for ConnectionMetadata"),
}
}
}

#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv6Addr},
sync::Mutex,
};

use super::*;

const TEST_SOCKET_ADDR: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 100);

#[test]
#[should_panic]
fn builder_panic_missing_proxied() {
ConnectionMetadataBuilder::new()
.poison_fn(|| {})
.local_addr(TEST_SOCKET_ADDR)
.remote_addr(TEST_SOCKET_ADDR)
.build();
}

#[test]
#[should_panic]
fn builder_panic_missing_poison_fn() {
ConnectionMetadataBuilder::new()
.proxied(true)
.local_addr(TEST_SOCKET_ADDR)
.remote_addr(TEST_SOCKET_ADDR)
.build();
}

#[test]
fn builder_all_fields_successful() {
let mutable_flag = Arc::new(Mutex::new(false));

let connection_metadata = ConnectionMetadataBuilder::new()
.proxied(true)
.local_addr(TEST_SOCKET_ADDR)
.remote_addr(TEST_SOCKET_ADDR)
.poison_fn({
let mutable_flag = Arc::clone(&mutable_flag);
move || {
let mut guard = mutable_flag.lock().unwrap();
*guard = !*guard;
}
})
.build();

assert_eq!(connection_metadata.is_proxied, true);
assert_eq!(connection_metadata.remote_addr(), Some(TEST_SOCKET_ADDR));
assert_eq!(connection_metadata.local_addr(), Some(TEST_SOCKET_ADDR));
assert_eq!(*mutable_flag.lock().unwrap(), false);
connection_metadata.poison();
assert_eq!(*mutable_flag.lock().unwrap(), true);
}

#[test]
fn builder_optional_fields_translate() {
let metadata1 = ConnectionMetadataBuilder::new()
.proxied(true)
.poison_fn(|| {})
.build();

assert_eq!(metadata1.local_addr(), None);
assert_eq!(metadata1.remote_addr(), None);

let metadata2 = ConnectionMetadataBuilder::new()
.proxied(true)
.poison_fn(|| {})
.local_addr(TEST_SOCKET_ADDR)
.build();

assert_eq!(metadata2.local_addr(), Some(TEST_SOCKET_ADDR));
assert_eq!(metadata2.remote_addr(), None);

let metadata3 = ConnectionMetadataBuilder::new()
.proxied(true)
.poison_fn(|| {})
.remote_addr(TEST_SOCKET_ADDR)
.build();

assert_eq!(metadata3.local_addr(), None);
assert_eq!(metadata3.remote_addr(), Some(TEST_SOCKET_ADDR));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl Intercept for ConnectionPoisoningInterceptor {
type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;

/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnection {
loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
Expand Down Expand Up @@ -154,7 +153,14 @@ mod test {
let retriever = CaptureSmithyConnection::new();
let retriever_clone = retriever.clone();
assert!(retriever.get().is_none());
retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));
retriever.set_connection_retriever(|| {
Some(
ConnectionMetadata::builder()
.proxied(true)
.poison_fn(|| {})
.build(),
)
});

assert!(retriever.get().is_some());
assert!(retriever_clone.get().is_some());
Expand Down
17 changes: 11 additions & 6 deletions rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,19 @@ fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<Connect
let mut extensions = Extensions::new();
conn.get_extras(&mut extensions);
let http_info = extensions.get::<HttpInfo>();
let smithy_connection = ConnectionMetadata::new(
conn.is_proxied(),
http_info.map(|info| info.remote_addr()),
move || match capture_conn.connection_metadata().as_ref() {
let mut builder = ConnectionMetadata::builder()
.proxied(conn.is_proxied())
.poison_fn(move || match capture_conn.connection_metadata().as_ref() {
Some(conn) => conn.poison(),
None => tracing::trace!("no connection existed to poison"),
},
);
});

builder
.set_local_addr(http_info.map(|info| info.local_addr()))
.set_remote_addr(http_info.map(|info| info.remote_addr()));

let smithy_connection = builder.build();

Some(smithy_connection)
} else {
None
Expand Down

0 comments on commit b78367c

Please sign in to comment.