From e7d15d03b30ef99058937636b0cccf4e9ab2aa1d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 10 Oct 2022 13:12:07 +0100 Subject: [PATCH] Tweak logs and attempt to avoid races around removing nodes (#504) * Tweak logs and attempt to avoid races around removing nodes * wrapping_add in assign_id --- backend/common/src/assign_id.rs | 9 +++-- .../src/aggregator/inner_loop.rs | 17 +++++----- backend/telemetry_core/src/main.rs | 34 +++++-------------- backend/telemetry_shard/src/aggregator.rs | 22 ++++++++---- backend/telemetry_shard/src/main.rs | 23 +++++++------ 5 files changed, 53 insertions(+), 52 deletions(-) diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index dfcbb20b..82c25532 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -47,7 +47,9 @@ where pub fn assign_id(&mut self, details: Details) -> Id { let this_id = self.current_id; - self.current_id += 1; + // It's very unlikely we'll ever overflow the ID limit, but in case we do, + // a wrapping_add will almost certainly be fine: + self.current_id = self.current_id.wrapping_add(1); self.mapping.insert(this_id, details); this_id.into() } @@ -73,7 +75,10 @@ where } pub fn clear(&mut self) { - *self = AssignId::new(); + // Leave the `current_id` as-is. Why? To avoid reusing IDs and risking + // race conditions where old messages can accidentally screw with new nodes + // that have been assigned the same ID. + self.mapping = BiMap::new(); } pub fn iter(&self) -> impl Iterator { diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 0b42082a..b72af01a 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -248,7 +248,7 @@ impl InnerLoop { } if let Err(e) = metered_tx.send(msg) { - log::error!("Cannot send message into aggregator: {}", e); + log::error!("Cannot send message into aggregator: {e}"); break; } } @@ -386,10 +386,11 @@ impl InnerLoop { let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) { Some((node_id, _)) => node_id, None => { - log::error!( - "Cannot find ID for node with shard/connectionId of {:?}/{:?}", - shard_conn_id, - local_id + // It's possible that some race between removing and disconnecting shards might lead to + // more than one remove message for the same node. This isn't really a problem, but we + // hope it won't happen so make a note if it does: + log::debug!( + "Remove: Cannot find ID for node with shard/connectionId of {shard_conn_id:?}/{local_id:?}" ); return; } @@ -401,9 +402,7 @@ impl InnerLoop { Some(id) => *id, None => { log::error!( - "Cannot find ID for node with shard/connectionId of {:?}/{:?}", - shard_conn_id, - local_id + "Update: Cannot find ID for node with shard/connectionId of {shard_conn_id:?}/{local_id:?}" ); return; } @@ -606,7 +605,7 @@ impl InnerLoop { let removed_details = match self.node_state.remove_node(node_id) { Some(remove_details) => remove_details, None => { - log::error!("Could not find node {:?}", node_id); + log::error!("Could not find node {node_id:?}"); return; } }; diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 051bcb0b..79c1a719 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -251,10 +251,7 @@ where break; } if let Err(e) = msg_info { - log::error!( - "Shutting down websocket connection: Failed to receive data: {}", - e - ); + log::error!("Shutting down websocket connection: Failed to receive data: {e}"); break; } @@ -262,10 +259,7 @@ where match bincode::options().deserialize(&bytes) { Ok(msg) => msg, Err(e) => { - log::error!( - "Failed to deserialize message from shard; booting it: {}", - e - ); + log::error!("Failed to deserialize message from shard; booting it: {e}"); break; } }; @@ -292,7 +286,7 @@ where }; if let Err(e) = tx_to_aggregator.send(aggregator_msg).await { - log::error!("Failed to send message to aggregator; closing shard: {}", e); + log::error!("Failed to send message to aggregator; closing shard: {e}"); break; } } @@ -325,13 +319,10 @@ where .expect("message to shard should serialize"); if let Err(e) = ws_send.send_binary(bytes).await { - log::error!("Failed to send message to aggregator; closing shard: {}", e) + log::error!("Failed to send message to aggregator; closing shard: {e}") } if let Err(e) = ws_send.flush().await { - log::error!( - "Failed to flush message to aggregator; closing shard: {}", - e - ) + log::error!("Failed to flush message to aggregator; closing shard: {e}") } } @@ -374,7 +365,7 @@ where channel: tx_to_feed_conn, }; if let Err(e) = tx_to_aggregator.send(init_msg).await { - log::error!("Error sending message to aggregator: {}", e); + log::error!("Error sending message to aggregator: {e}"); return (tx_to_aggregator, ws_send); } @@ -399,10 +390,7 @@ where break; } if let Err(e) = msg_info { - log::error!( - "Shutting down websocket connection: Failed to receive data: {}", - e - ); + log::error!("Shutting down websocket connection: Failed to receive data: {e}"); break; } @@ -416,16 +404,12 @@ where let cmd = match FromFeedWebsocket::from_str(&text) { Ok(cmd) => cmd, Err(e) => { - log::warn!( - "Ignoring invalid command '{}' from the frontend: {}", - text, - e - ); + log::warn!("Ignoring invalid command '{text}' from the frontend: {e}"); continue; } }; if let Err(e) = tx_to_aggregator.send(cmd).await { - log::error!("Failed to send message to aggregator; closing feed: {}", e); + log::error!("Failed to send message to aggregator; closing feed: {e}"); break; } } diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index d40807fe..ef3254bf 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -261,9 +261,14 @@ impl Aggregator { // Remove references to this single node: to_local_id.remove_by_id(local_id); muted.remove(&local_id); - let _ = tx_to_telemetry_core - .send_async(FromShardAggregator::RemoveNode { local_id }) - .await; + + // If we're not connected to the core, don't buffer up remove messages. The core will remove + // all nodes associated with this shard anyway, so the remove message would be redundant. + if connected_to_telemetry_core { + let _ = tx_to_telemetry_core + .send_async(FromShardAggregator::RemoveNode { local_id }) + .await; + } } ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { // Find all of the local IDs corresponding to the disconnected connection ID and @@ -280,9 +285,14 @@ impl Aggregator { for local_id in local_ids_disconnected { to_local_id.remove_by_id(local_id); muted.remove(&local_id); - let _ = tx_to_telemetry_core - .send_async(FromShardAggregator::RemoveNode { local_id }) - .await; + + // If we're not connected to the core, don't buffer up remove messages. The core will remove + // all nodes associated with this shard anyway, so the remove message would be redundant. + if connected_to_telemetry_core { + let _ = tx_to_telemetry_core + .send_async(FromShardAggregator::RemoveNode { local_id }) + .await; + } } } ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute { diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index a95142a3..45aa1041 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -239,7 +239,7 @@ where close_connection: close_connection_tx.clone(), }; if let Err(e) = tx_to_aggregator.send(init_msg).await { - log::error!("Error sending message to aggregator: {}", e); + log::error!("Shutting down websocket connection from {real_addr:?}: Error sending message to aggregator: {e}"); return (tx_to_aggregator, ws_send); } @@ -254,7 +254,7 @@ where // The close channel has fired, so end the loop. `ws_recv.receive_data` is // *not* cancel safe, but since we're closing the connection we don't care. _ = close_connection_rx.recv_async() => { - log::info!("connection to {:?} being closed", real_addr); + log::info!("connection to {real_addr:?} being closed"); break }, // Receive data and relay it on to our main select loop below. @@ -263,7 +263,7 @@ where break; } if let Err(e) = msg_info { - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + log::error!("Shutting down websocket connection from {real_addr:?}: Failed to receive data: {e}"); break; } if ws_tx_atomic.unbounded_send(bytes).is_err() { @@ -292,14 +292,14 @@ where .collect(); for &message_id in &stale_ids { - log::info!("Removing stale node with message ID {} from {:?}", message_id, real_addr); + log::info!("Removing stale node with message ID {message_id} from {real_addr:?}"); allowed_message_ids.remove(&message_id); let _ = tx_to_aggregator.send(FromWebsocket::Remove { message_id } ).await; } if !stale_ids.is_empty() && allowed_message_ids.is_empty() { // End the entire connection if no recent messages came in for any ID. - log::info!("Closing stale connection from {:?}", real_addr); + log::info!("Closing stale connection from {real_addr:?}"); break; } }, @@ -316,7 +316,7 @@ where let this_bytes_per_second = rolling_total_bytes.total() / 10; if this_bytes_per_second > bytes_per_second { block_list.block_addr(real_addr, "Too much traffic"); - log::error!("Shutting down websocket connection: Too much traffic ({}bps averaged over last 10s)", this_bytes_per_second); + log::error!("Shutting down websocket connection: Too much traffic ({this_bytes_per_second}bps averaged over last 10s)"); break; } @@ -327,7 +327,7 @@ where Err(e) => { let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes); let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8"); - log::warn!("Failed to parse node message ({}): {}", msg_start, e); + log::warn!("Failed to parse node message ({msg_start}): {e}"); continue; }, #[cfg(not(debug))] @@ -347,7 +347,7 @@ where if let node_message::Payload::SystemConnected(info) = payload { // Too many nodes seen on this connection? Ignore this one. if allowed_message_ids.len() >= max_nodes_per_connection { - log::info!("Ignoring new node from {:?} (we've hit the max of {} nodes per connection)", real_addr, max_nodes_per_connection); + log::info!("Ignoring new node with ID {message_id} from {real_addr:?} (we've hit the max of {max_nodes_per_connection} nodes per connection)"); continue; } @@ -355,7 +355,7 @@ where allowed_message_ids.insert(message_id, Instant::now()); // Tell the aggregator loop about the new node. - log::info!("Adding node with message ID {} from {:?}", message_id, real_addr); + log::info!("Adding node with message ID {message_id} from {real_addr:?}"); let _ = tx_to_aggregator.send(FromWebsocket::Add { message_id, ip: real_addr, @@ -369,9 +369,12 @@ where if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) { *last_seen = Instant::now(); if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { - log::error!("Failed to send node message to aggregator: {}", e); + log::error!("Failed to send node message to aggregator: {e}"); continue; } + } else { + log::info!("Ignoring message with ID {message_id} from {real_addr:?} (we've hit the max of {max_nodes_per_connection} nodes per connection)"); + continue; } } }