Skip to content

Commit

Permalink
Check last heartbeat acknowledged in heartbeater
Browse files Browse the repository at this point in the history
When heartbeating, first ensure that the previous heartbeat was
acknowledged. If it wasn't, shutdown the sender and receiver so that
an auto-reconnect can take place.

When receiving a Heartbeat Acknowledgement, set the
`last_heartbeat_acknowledged` to `true` to prevent the auto-reconnect
process.
  • Loading branch information
Zeyla Hellyer committed Jun 2, 2017
1 parent afc571f commit ec9b1c7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
10 changes: 10 additions & 0 deletions src/gateway/prep.rs
Expand Up @@ -77,6 +77,7 @@ pub fn build_gateway_url(base: &str) -> Result<RequestUrl> {

pub fn keepalive(interval: u64,
heartbeat_sent: Arc<Mutex<Instant>>,
last_ack: Arc<Mutex<bool>>,
mut sender: Sender<WebSocketStream>,
channel: &MpscReceiver<GatewayStatus>) {
let mut base_interval = Duration::milliseconds(interval as i64);
Expand Down Expand Up @@ -110,6 +111,14 @@ pub fn keepalive(interval: u64,
}

if time::get_time() >= next_tick {
// If the last heartbeat didn't receive an acknowledgement, then
// shutdown and auto-reconnect.
if !*last_ack.lock().unwrap() {
debug!("Last heartbeat not acknowledged; re-connecting");

break;
}

next_tick = next_tick + base_interval;

let map = json!({
Expand All @@ -124,6 +133,7 @@ pub fn keepalive(interval: u64,
let now = Instant::now();

*heartbeat_sent.lock().unwrap() = now;
*last_ack.lock().unwrap() = false;
},
Err(why) => {
match why {
Expand Down
28 changes: 24 additions & 4 deletions src/gateway/shard.rs
Expand Up @@ -62,17 +62,24 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool);
/// [`receive`]: #method.receive
/// [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding
/// [module docs]: index.html#sharding
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Shard {
current_presence: CurrentPresence,
/// A tuple of the last instant that a heartbeat was sent, and the last that
/// an acknowledgement was received.
/// A tuple of:
///
/// - the last instant that a heartbeat was sent
/// - the last instant that an acknowledgement was received
///
/// This can be used to calculate [`latency`].
///
/// [`latency`]: fn.latency.html
heartbeat_instants: (Arc<Mutex<Instant>>, Option<Instant>),
keepalive_channel: MpscSender<GatewayStatus>,
/// This is used by the keepalive thread to determine whether the last
/// heartbeat was sent without an acknowledgement, and whether to reconnect.
// This _must_ be set to `true` in `Shard::handle_event`'s
// `Ok(GatewayEvent::HeartbeatAck)` arm.
last_heartbeat_acknowledged: Arc<Mutex<bool>>,
seq: u64,
session_id: Option<String>,
shard_info: Option<[u64; 2]>,
Expand Down Expand Up @@ -142,10 +149,19 @@ impl Shard {
let heartbeat_sent = Arc::new(Mutex::new(Instant::now()));
let heartbeat_clone = heartbeat_sent.clone();

// Set this to true: when the keepalive thread sends a heartbeat, it
// will check if the value is `false`.
//
// If it is, it will reconnect. This enters the bot into a reconnect
// loop. Set this to `true` to give Discord the first heartbeat to
// acknowledge first.
let last_ack = Arc::new(Mutex::new(true));
let last_ack_clone = last_ack.clone();

ThreadBuilder::new()
.name(thread_name)
.spawn(move || {
prep::keepalive(heartbeat_interval, heartbeat_clone, sender, &rx)
prep::keepalive(heartbeat_interval, heartbeat_clone, last_ack_clone, sender, &rx)
})?;

// Parse READY
Expand All @@ -155,10 +171,12 @@ impl Shard {
&mut receiver,
identification)?;


Ok((feature_voice! {{
Shard {
current_presence: (None, OnlineStatus::Online, false),
heartbeat_instants: (heartbeat_sent, None),
last_heartbeat_acknowledged: last_ack,
keepalive_channel: tx.clone(),
seq: sequence,
token: token.to_owned(),
Expand All @@ -171,6 +189,7 @@ impl Shard {
Shard {
current_presence: (None, OnlineStatus::Online, false),
heartbeat_instants: (heartbeat_sent, None),
last_heartbeat_acknowledged: last_ack,
keepalive_channel: tx.clone(),
seq: sequence,
token: token.to_owned(),
Expand Down Expand Up @@ -356,6 +375,7 @@ impl Shard {
},
Ok(GatewayEvent::HeartbeatAck) => {
self.heartbeat_instants.1 = Some(Instant::now());
*self.last_heartbeat_acknowledged.lock().unwrap() = true;

Ok(None)
},
Expand Down

0 comments on commit ec9b1c7

Please sign in to comment.