Skip to content

Commit

Permalink
Simplify the way shard_info is displayed in logs
Browse files Browse the repository at this point in the history
  • Loading branch information
elkowar committed Feb 2, 2024
1 parent 6478e9f commit a66bc81
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/gateway/bridge/shard_queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl ShardQueuer {

spawn_named("shard_queuer::stop", async move {
drop(runner.run().await);
debug!("[ShardRunner {:?}] Stopping", runner.shard.shard_info());
debug!("[ShardRunner {}] Stopping", runner.shard.shard_info());
});

self.runners.lock().await.insert(shard_id, runner_info);
Expand Down
23 changes: 10 additions & 13 deletions src/gateway/bridge/shard_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,17 @@ impl ShardRunner {
/// [`ShardManager`]: super::ShardManager
#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
pub async fn run(&mut self) -> Result<()> {
info!("[ShardRunner {:?}] Running", self.shard.shard_info());
info!("[ShardRunner {}] Running", self.shard.shard_info());

loop {
trace!("[ShardRunner {:?}] loop iteration started.", self.shard.shard_info());
trace!("[ShardRunner {}] loop iteration started.", self.shard.shard_info());
if !self.recv().await {
return Ok(());
}

// check heartbeat
if !self.shard.do_heartbeat().await {
warn!("[ShardRunner {:?}] Error heartbeating", self.shard.shard_info(),);
warn!("[ShardRunner {}] Error heartbeating", self.shard.shard_info(),);

self.request_restart().await;
return Ok(());
Expand Down Expand Up @@ -141,7 +141,7 @@ impl ShardRunner {
Some(other) => {
if let Err(e) = self.action(&other).await {
debug!(
"[ShardRunner {:?}] Reconnecting due to error performing {:?}: {:?}",
"[ShardRunner {}] Reconnecting due to error performing {:?}: {:?}",
self.shard.shard_info(),
other,
e
Expand All @@ -154,7 +154,7 @@ impl ShardRunner {
ReconnectType::Resume => {
if let Err(why) = self.shard.resume().await {
warn!(
"[ShardRunner {:?}] Resume failed, reidentifying: {:?}",
"[ShardRunner {}] Resume failed, reidentifying: {:?}",
self.shard.shard_info(),
why
);
Expand All @@ -175,7 +175,7 @@ impl ShardRunner {

debug!(
event.name = %event.name().unwrap_or_default(),
"[ShardRunner {:?}] Dispatching event", self.shard.shard_info(),
"[ShardRunner {}] Dispatching event", self.shard.shard_info(),
);

dispatch_model(
Expand All @@ -192,7 +192,7 @@ impl ShardRunner {
self.request_restart().await;
return Ok(());
}
trace!("[ShardRunner {:?}] loop iteration reached the end.", self.shard.shard_info());
trace!("[ShardRunner {}] loop iteration reached the end.", self.shard.shard_info());
}
}

Expand Down Expand Up @@ -254,7 +254,7 @@ impl ShardRunner {
Some(Ok(tungstenite::Message::Close(_))) => break,
Some(Err(_)) => {
warn!(
"[ShardRunner {:?}] Received an error awaiting close frame",
"[ShardRunner {}] Received an error awaiting close frame",
self.shard.shard_info(),
);
break;
Expand Down Expand Up @@ -379,10 +379,7 @@ impl ShardRunner {
}
},
Ok(None) => {
warn!(
"[ShardRunner {:?}] Sending half DC; restarting",
self.shard.shard_info(),
);
warn!("[ShardRunner {}] Sending half DC; restarting", self.shard.shard_info(),);

self.request_restart().await;
return false;
Expand Down Expand Up @@ -468,7 +465,7 @@ impl ShardRunner {

#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
async fn request_restart(&mut self) {
debug!("[ShardRunner {:?}] Requesting restart", self.shard.shard_info());
debug!("[ShardRunner {}] Requesting restart", self.shard.shard_info());

self.update_manager().await;

Expand Down
70 changes: 35 additions & 35 deletions src/gateway/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ impl Shard {
if let Error::Tungstenite(err) = &why {
if let TungsteniteError::Io(err) = &**err {
if err.raw_os_error() != Some(32) {
debug!("[{:?}] Err heartbeating: {:?}", self.shard_info, err);
debug!("[{}] Err heartbeating: {:?}", self.shard_info, err);
return Err(Error::Gateway(GatewayError::HeartbeatFailed));
}
}
}

warn!("[{:?}] Other err w/ keepalive: {:?}", self.shard_info, why);
warn!("[{}] Other err w/ keepalive: {:?}", self.shard_info, why);
Err(Error::Gateway(GatewayError::HeartbeatFailed))
},
}
Expand Down Expand Up @@ -271,12 +271,12 @@ impl Shard {
))]
fn handle_gateway_dispatch(&mut self, seq: u64, event: &Event) -> Option<ShardAction> {
if seq > self.seq + 1 {
warn!("[{:?}] Sequence off; them: {}, us: {}", self.shard_info, seq, self.seq);
warn!("[{}] Sequence off; them: {}, us: {}", self.shard_info, seq, self.seq);
}

match &event {
Event::Ready(ready) => {
debug!("[{:?}] Received Ready", self.shard_info);
debug!("[{}] Received Ready", self.shard_info);

self.session_id = Some(ready.ready.session_id.clone());
self.stage = ConnectionStage::Connected;
Expand All @@ -286,7 +286,7 @@ impl Shard {
}
},
Event::Resumed(_) => {
info!("[{:?}] Resumed", self.shard_info);
info!("[{}] Resumed", self.shard_info);

self.stage = ConnectionStage::Connected;
self.last_heartbeat_acknowledged = true;
Expand All @@ -303,12 +303,12 @@ impl Shard {

#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
fn handle_heartbeat_event(&mut self, s: u64) -> ShardAction {
info!("[{:?}] Received shard heartbeat", self.shard_info);
info!("[{}] Received shard heartbeat", self.shard_info);

// Received seq is off -- attempt to resume.
if s > self.seq + 1 {
info!(
"[{:?}] Received off sequence (them: {}; us: {}); resuming",
"[{}] Received off sequence (them: {}; us: {}); resuming",
self.shard_info, s, self.seq
);

Expand All @@ -317,7 +317,7 @@ impl Shard {

return ShardAction::Identify;
}
warn!("[{:?}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info);
warn!("[{}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info);

return ShardAction::Reconnect(self.reconnection_type());
}
Expand All @@ -335,63 +335,63 @@ impl Shard {

match num {
Some(close_codes::UNKNOWN_OPCODE) => {
warn!("[{:?}] Sent invalid opcode.", self.shard_info);
warn!("[{}] Sent invalid opcode.", self.shard_info);
},
Some(close_codes::DECODE_ERROR) => {
warn!("[{:?}] Sent invalid message.", self.shard_info);
warn!("[{}] Sent invalid message.", self.shard_info);
},
Some(close_codes::NOT_AUTHENTICATED) => {
warn!("[{:?}] Sent no authentication.", self.shard_info);
warn!("[{}] Sent no authentication.", self.shard_info);

return Err(Error::Gateway(GatewayError::NoAuthentication));
},
Some(close_codes::AUTHENTICATION_FAILED) => {
error!(
"[{:?}] Sent invalid authentication, please check the token.",
"[{}] Sent invalid authentication, please check the token.",
self.shard_info
);

return Err(Error::Gateway(GatewayError::InvalidAuthentication));
},
Some(close_codes::ALREADY_AUTHENTICATED) => {
warn!("[{:?}] Already authenticated.", self.shard_info);
warn!("[{}] Already authenticated.", self.shard_info);
},
Some(close_codes::INVALID_SEQUENCE) => {
warn!("[{:?}] Sent invalid seq: {}.", self.shard_info, self.seq);
warn!("[{}] Sent invalid seq: {}.", self.shard_info, self.seq);

self.seq = 0;
},
Some(close_codes::RATE_LIMITED) => {
warn!("[{:?}] Gateway ratelimited.", self.shard_info);
warn!("[{}] Gateway ratelimited.", self.shard_info);
},
Some(close_codes::INVALID_SHARD) => {
warn!("[{:?}] Sent invalid shard data.", self.shard_info);
warn!("[{}] Sent invalid shard data.", self.shard_info);

return Err(Error::Gateway(GatewayError::InvalidShardData));
},
Some(close_codes::SHARDING_REQUIRED) => {
error!("[{:?}] Shard has too many guilds.", self.shard_info);
error!("[{}] Shard has too many guilds.", self.shard_info);

return Err(Error::Gateway(GatewayError::OverloadedShard));
},
Some(4006 | close_codes::SESSION_TIMEOUT) => {
info!("[{:?}] Invalid session.", self.shard_info);
info!("[{}] Invalid session.", self.shard_info);

self.session_id = None;
},
Some(close_codes::INVALID_GATEWAY_INTENTS) => {
error!("[{:?}] Invalid gateway intents have been provided.", self.shard_info);
error!("[{}] Invalid gateway intents have been provided.", self.shard_info);

return Err(Error::Gateway(GatewayError::InvalidGatewayIntents));
},
Some(close_codes::DISALLOWED_GATEWAY_INTENTS) => {
error!("[{:?}] Disallowed gateway intents have been provided.", self.shard_info);
error!("[{}] Disallowed gateway intents have been provided.", self.shard_info);

return Err(Error::Gateway(GatewayError::DisallowedGatewayIntents));
},
Some(other) if !clean => {
warn!(
"[{:?}] Unknown unclean close {}: {:?}",
"[{}] Unknown unclean close {}: {:?}",
self.shard_info,
other,
data.map(|d| &d.reason),
Expand Down Expand Up @@ -439,12 +439,12 @@ impl Shard {
self.last_heartbeat_ack = Some(Instant::now());
self.last_heartbeat_acknowledged = true;

trace!("[{:?}] Received heartbeat ack", self.shard_info);
trace!("[{}] Received heartbeat ack", self.shard_info);

Ok(None)
},
&Ok(GatewayEvent::Hello(interval)) => {
debug!("[{:?}] Received a Hello; interval: {}", self.shard_info, interval);
debug!("[{}] Received a Hello; interval: {}", self.shard_info, interval);

if self.stage == ConnectionStage::Resuming {
return Ok(None);
Expand All @@ -455,13 +455,13 @@ impl Shard {
Ok(Some(if self.stage == ConnectionStage::Handshake {
ShardAction::Identify
} else {
debug!("[{:?}] Received late Hello; autoreconnecting", self.shard_info);
debug!("[{}] Received late Hello; autoreconnecting", self.shard_info);

ShardAction::Reconnect(self.reconnection_type())
}))
},
&Ok(GatewayEvent::InvalidateSession(resumable)) => {
info!("[{:?}] Received session invalidation", self.shard_info);
info!("[{}] Received session invalidation", self.shard_info);

Ok(Some(if resumable {
ShardAction::Reconnect(ReconnectType::Resume)
Expand All @@ -474,16 +474,16 @@ impl Shard {
self.handle_gateway_closed(data.as_ref())
},
Err(Error::Tungstenite(why)) => {
warn!("[{:?}] Websocket error: {:?}", self.shard_info, why);
info!("[{:?}] Will attempt to auto-reconnect", self.shard_info);
warn!("[{}] Websocket error: {:?}", self.shard_info, why);
info!("[{}] Will attempt to auto-reconnect", self.shard_info);

Ok(Some(ShardAction::Reconnect(self.reconnection_type())))
},
Err(why) => {
if let Error::Json(_) = why {
// Deserialization errors already get logged when the event is first received
} else {
warn!("[{:?}] Unhandled error: {:?}", self.shard_info, why);
warn!("[{}] Unhandled error: {:?}", self.shard_info, why);
}

Ok(None)
Expand Down Expand Up @@ -519,18 +519,18 @@ impl Shard {

// If the last heartbeat didn't receive an acknowledgement, then auto-reconnect.
if !self.last_heartbeat_acknowledged {
debug!("[{:?}] Last heartbeat not acknowledged", self.shard_info,);
debug!("[{}] Last heartbeat not acknowledged", self.shard_info,);

return false;
}

// Otherwise, we're good to heartbeat.
if let Err(why) = self.heartbeat().await {
warn!("[{:?}] Err heartbeating: {:?}", self.shard_info, why);
warn!("[{}] Err heartbeating: {:?}", self.shard_info, why);

false
} else {
trace!("[{:?}] Heartbeat", self.shard_info);
trace!("[{}] Heartbeat", self.shard_info);

true
}
Expand Down Expand Up @@ -645,7 +645,7 @@ impl Shard {
filter: ChunkGuildFilter,
nonce: Option<&str>,
) -> Result<()> {
debug!("[{:?}] Requesting member chunks", self.shard_info);
debug!("[{}] Requesting member chunks", self.shard_info);

self.client
.send_chunk_guild(guild_id, &self.shard_info, limit, presences, filter, nonce)
Expand Down Expand Up @@ -683,7 +683,7 @@ impl Shard {
/// Errors if unable to establish a websocket connection.
#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
pub async fn initialize(&mut self) -> Result<WsClient> {
debug!("[{:?}] Initializing.", self.shard_info);
debug!("[{}] Initializing.", self.shard_info);

// We need to do two, sort of three things here:
// - set the stage of the shard as opening the websocket connection
Expand Down Expand Up @@ -716,7 +716,7 @@ impl Shard {
/// Errors if unable to re-establish a websocket connection.
#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
pub async fn resume(&mut self) -> Result<()> {
debug!("[{:?}] Attempting to resume", self.shard_info);
debug!("[{}] Attempting to resume", self.shard_info);

self.client = self.initialize().await?;
self.stage = ConnectionStage::Resuming;
Expand All @@ -736,7 +736,7 @@ impl Shard {
/// Errors if unable to re-establish a websocket connection.
#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
pub async fn reconnect(&mut self) -> Result<()> {
info!("[{:?}] Attempting to reconnect", self.shard_info());
info!("[{}] Attempting to reconnect", self.shard_info());

self.reset();
self.client = self.initialize().await?;
Expand Down
10 changes: 5 additions & 5 deletions src/gateway/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl WsClient {
filter: ChunkGuildFilter,
nonce: Option<&str>,
) -> Result<()> {
debug!("[{:?}] Requesting member chunks", shard_info);
debug!("[{}] Requesting member chunks", shard_info);

let (query, user_ids) = match filter {
ChunkGuildFilter::None => (Some(String::new()), None),
Expand All @@ -207,7 +207,7 @@ impl WsClient {
/// Errors if there is a problem with the WS connection.
#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip(self)))]
pub async fn send_heartbeat(&mut self, shard_info: &ShardInfo, seq: Option<u64>) -> Result<()> {
trace!("[{:?}] Sending heartbeat d: {:?}", shard_info, seq);
trace!("[{}] Sending heartbeat d: {:?}", shard_info, seq);

self.send_json(&WebSocketMessage {
op: Opcode::Heartbeat,
Expand All @@ -230,7 +230,7 @@ impl WsClient {
let now = SystemTime::now();
let activities = presence.activity.as_ref().map(std::slice::from_ref).unwrap_or_default();

debug!("[{:?}] Identifying", shard);
debug!("[{}] Identifying", shard);

let msg = WebSocketMessage {
op: Opcode::Identify,
Expand Down Expand Up @@ -269,7 +269,7 @@ impl WsClient {
let now = SystemTime::now();
let activities = presence.activity.as_ref().map(std::slice::from_ref).unwrap_or_default();

debug!("[{shard_info:?}] Sending presence update");
debug!("[{shard_info}] Sending presence update");

self.send_json(&WebSocketMessage {
op: Opcode::PresenceUpdate,
Expand All @@ -294,7 +294,7 @@ impl WsClient {
seq: u64,
token: &str,
) -> Result<()> {
debug!("[{:?}] Sending resume; seq: {}", shard_info, seq);
debug!("[{}] Sending resume; seq: {}", shard_info, seq);

self.send_json(&WebSocketMessage {
op: Opcode::Resume,
Expand Down

0 comments on commit a66bc81

Please sign in to comment.