Skip to content

Commit

Permalink
Fix propagating tracing spans into subchains
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 24, 2022
1 parent 17f08ff commit 0659c8f
Showing 1 changed file with 69 additions and 67 deletions.
136 changes: 69 additions & 67 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,75 +183,77 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
self.listener = Some(create_listener(&self.listen_addr).await?);
}
}

// Accept a new socket. This will attempt to perform error handling.
// The `accept` method internally attempts to recover errors, so an
// error here is non-recoverable.
let socket = self.accept().await?;

debug!("got socket");
self.available_connections_gauge
.set(self.limit_connections.available_permits() as f64);

let peer = socket
.peer_addr()
.map(|p| format!("{}", p.ip()))
.unwrap_or_else(|_| "Unknown peer".to_string());

let conn_string = socket
.peer_addr()
.map(|p| format!("{}:{}", p.ip(), p.port()))
.unwrap_or_else(|_| "Unknown peer".to_string());

// Create the necessary per-connection handler state.
socket.set_nodelay(true)?;

let (pushed_messages_tx, pushed_messages_rx) =
tokio::sync::mpsc::unbounded_channel::<Messages>();

let mut handler = Handler {
chain: self
.chain
.clone_with_pushed_messages_tx(pushed_messages_tx.clone()),
client_details: peer,
conn_details: conn_string,
source_details: self.source_name.clone(),

// The connection state needs a handle to the max connections
// semaphore. When the handler is done processing the
// connection, a permit is added back to the semaphore.
codec: self.codec.clone(),
limit_connections: self.limit_connections.clone(),

// Receive shutdown notifications.
shutdown: Shutdown::new(self.trigger_shutdown_rx.clone()),

terminate_tasks: None,
tls: self.tls.clone(),
timeout: self.timeout,
};

self.connection_count = self.connection_count.wrapping_add(1);

// Spawn a new task to process the connections.
tokio::spawn(
async move {
tracing::debug!("New connection from {}", handler.conn_details);

// Process the connection. If an error is encountered, log it.
if let Err(err) = handler.run(socket, pushed_messages_rx).await {
error!(
"{:?}",
err.context("connection was unexpectedly terminated")
);
}
}
.instrument(tracing::error_span!(
"connection",
id = self.connection_count,
source = self.source_name.as_str()
)),
let span = tracing::error_span!(
"connection",
id = self.connection_count,
source = self.source_name.as_str(),
);
async {
// Accept a new socket. This will attempt to perform error handling.
// The `accept` method internally attempts to recover errors, so an
// error here is non-recoverable.
let socket = self.accept().await?;

debug!("got socket");
self.available_connections_gauge
.set(self.limit_connections.available_permits() as f64);

let peer = socket
.peer_addr()
.map(|p| format!("{}", p.ip()))
.unwrap_or_else(|_| "Unknown peer".to_string());

let conn_string = socket
.peer_addr()
.map(|p| format!("{}:{}", p.ip(), p.port()))
.unwrap_or_else(|_| "Unknown peer".to_string());

// Create the necessary per-connection handler state.
socket.set_nodelay(true)?;

let (pushed_messages_tx, pushed_messages_rx) =
tokio::sync::mpsc::unbounded_channel::<Messages>();

let mut handler = Handler {
chain: self.chain.clone_with_pushed_messages_tx(pushed_messages_tx),
client_details: peer,
conn_details: conn_string,
source_details: self.source_name.clone(),

// The connection state needs a handle to the max connections
// semaphore. When the handler is done processing the
// connection, a permit is added back to the semaphore.
codec: self.codec.clone(),
limit_connections: self.limit_connections.clone(),

// Receive shutdown notifications.
shutdown: Shutdown::new(self.trigger_shutdown_rx.clone()),

terminate_tasks: None,
tls: self.tls.clone(),
timeout: self.timeout,
};

// Spawn a new task to process the connections.
tokio::spawn(
async move {
tracing::debug!("New connection from {}", handler.conn_details);

// Process the connection. If an error is encountered, log it.
if let Err(err) = handler.run(socket, pushed_messages_rx).await {
error!(
"{:?}",
err.context("connection was unexpectedly terminated")
);
}
}
.in_current_span(),
);
Ok::<(), anyhow::Error>(())
}
.instrument(span)
.await?;
}
}

Expand Down

0 comments on commit 0659c8f

Please sign in to comment.