Skip to content

Commit

Permalink
Rework shard logic and shard handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeyla Hellyer committed Jun 17, 2017
1 parent 9a43507 commit 601704a
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 547 deletions.
176 changes: 49 additions & 127 deletions src/client/mod.rs
Expand Up @@ -35,7 +35,6 @@ pub use ::http as rest;
#[cfg(feature="cache")]
pub use ::CACHE;

use chrono::UTC;
use self::dispatch::dispatch;
use self::event_store::EventStore;
use std::collections::HashMap;
Expand Down Expand Up @@ -170,7 +169,7 @@ pub struct Client {
event_store: Arc<RwLock<EventStore>>,
#[cfg(feature="framework")]
framework: Arc<Mutex<Framework>>,
token: String,
token: Arc<Mutex<String>>,
}

#[allow(type_complexity)]
Expand Down Expand Up @@ -307,7 +306,7 @@ impl Client {
///
/// [gateway docs]: gateway/index.html#sharding
pub fn start(&mut self) -> Result<()> {
self.start_connection(None, http::get_gateway()?.url)
self.start_connection([0, 0, 1], http::get_gateway()?.url)
}

/// Establish the connection(s) and start listening for events.
Expand Down Expand Up @@ -362,7 +361,7 @@ impl Client {

drop(res);

self.start_connection(Some([0, x, y]), url)
self.start_connection([0, x, y], url)
}

/// Establish a sharded connection and start listening for events.
Expand Down Expand Up @@ -434,7 +433,7 @@ impl Client {
/// [`start_autosharded`]: #method.start_autosharded
/// [gateway docs]: gateway/index.html#sharding
pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> {
self.start_connection(Some([shard, shard, shards]), http::get_gateway()?.url)
self.start_connection([shard, shard, shards], http::get_gateway()?.url)
}

/// Establish sharded connections and start listening for events.
Expand Down Expand Up @@ -483,7 +482,7 @@ impl Client {
/// [`start_shard_range`]: #method.start_shard_range
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shards(&mut self, total_shards: u64) -> Result<()> {
self.start_connection(Some([0, total_shards - 1, total_shards]), http::get_gateway()?.url)
self.start_connection([0, total_shards - 1, total_shards], http::get_gateway()?.url)
}

/// Establish a range of sharded connections and start listening for events.
Expand Down Expand Up @@ -544,7 +543,7 @@ impl Client {
/// [`start_shards`]: #method.start_shards
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> {
self.start_connection(Some([range[0], range[1], total_shards]), http::get_gateway()?.url)
self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url)
}

/// Attaches a handler for when a [`ChannelCreate`] is received.
Expand Down Expand Up @@ -950,7 +949,7 @@ impl Client {
// an error.
//
// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
fn start_connection(&mut self, shard_data: Option<[u64; 3]>, url: String)
fn start_connection(&mut self, shard_data: [u64; 3], url: String)
-> Result<()> {
// Update the framework's current user if the feature is enabled.
//
Expand All @@ -966,13 +965,13 @@ impl Client {

let gateway_url = Arc::new(Mutex::new(url));

let shards_index = shard_data.map_or(0, |x| x[0]);
let shards_total = shard_data.map_or(1, |x| x[1] + 1);
let shards_index = shard_data[0];
let shards_total = shard_data[1] + 1;

let mut threads = vec![];

for shard_number in shards_index..shards_total {
let shard_info = shard_data.map(|s| [shard_number, s[2]]);
let shard_info = [shard_number, shard_data[2]];

let boot = boot_shard(&BootInfo {
gateway_url: gateway_url.clone(),
Expand All @@ -981,29 +980,9 @@ impl Client {
});

match boot {
Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
.unwrap()
.update_with_ready(&ready);
}

Ok(shard) => {
let shard = Arc::new(Mutex::new(shard));

feature_framework! {{
dispatch(Event::Ready(ready),
&shard,
&self.framework,
&self.data,
&self.event_store);
} else {
dispatch(Event::Ready(ready),
&shard,
&self.data,
&self.event_store);
}}

let monitor_info = feature_framework! {{
MonitorInfo {
data: self.data.clone(),
Expand Down Expand Up @@ -1241,8 +1220,8 @@ impl Client {

struct BootInfo {
gateway_url: Arc<Mutex<String>>,
shard_info: Option<[u64; 2]>,
token: String,
shard_info: [u64; 2],
token: Arc<Mutex<String>>,
}

#[cfg(feature="framework")]
Expand All @@ -1252,8 +1231,8 @@ struct MonitorInfo {
framework: Arc<Mutex<Framework>>,
gateway_url: Arc<Mutex<String>>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
shard_info: [u64; 2],
token: Arc<Mutex<String>>,
}

#[cfg(not(feature="framework"))]
Expand All @@ -1262,11 +1241,11 @@ struct MonitorInfo {
event_store: Arc<RwLock<EventStore>>,
gateway_url: Arc<Mutex<String>>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
shard_info: [u64; 2],
token: String,
}

fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
fn boot_shard(info: &BootInfo) -> Result<Shard> {
// Make ten attempts to boot the shard, exponentially backing off; if it
// still doesn't boot after that, accept it as a failure.
//
Expand All @@ -1288,22 +1267,15 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
}
}

let attempt = Shard::new(&info.gateway_url.lock().unwrap(),
&info.token,
let attempt = Shard::new(info.gateway_url.clone(),
info.token.clone(),
info.shard_info);

match attempt {
Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
.unwrap()
.update_with_ready(&ready);
}

Ok(shard) => {
info!("Successfully booted shard: {:?}", info.shard_info);

return Ok((shard, ready));
return Ok(shard);
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
}
Expand All @@ -1327,29 +1299,11 @@ fn monitor_shard(mut info: MonitorInfo) {
});

match boot {
Ok((new_shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write().unwrap().update_with_ready(&ready);
}

Ok(new_shard) => {
*info.shard.lock().unwrap() = new_shard;

boot_successful = true;

feature_framework! {{
dispatch(Event::Ready(ready),
&info.shard,
&info.framework,
&info.data,
&info.event_store);
} else {
dispatch(Event::Ready(ready),
&info.shard,
&info.data,
&info.event_store);
}}

break;
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
Expand All @@ -1370,43 +1324,11 @@ fn monitor_shard(mut info: MonitorInfo) {

fn handle_shard(info: &mut MonitorInfo) {
// This is currently all ducktape. Redo this.
let mut last_ack_time = UTC::now().timestamp();
let mut last_heartbeat_sent = UTC::now().timestamp();

loop {
let in_secs = {
let shard = info.shard.lock().unwrap();

shard.heartbeat_interval() / 1000
};

if UTC::now().timestamp() - last_heartbeat_sent > in_secs {
{
let mut shard = info.shard.lock().unwrap();

// If the last heartbeat didn't receive an acknowledgement, then
// shutdown and auto-reconnect.
if !shard.last_heartbeat_acknowledged() {
debug!("Last heartbeat not acknowledged; re-connecting");

match shard.resume() {
Ok(_) => {
debug!("Successfully resumed shard");

last_ack_time = UTC::now().timestamp();
last_heartbeat_sent = UTC::now().timestamp();

continue;
},
Err(why) => {
warn!("Err resuming shard: {:?}", why);

return;
},
}
}

let _ = shard.heartbeat();
last_heartbeat_sent = UTC::now().timestamp();
shard.check_heartbeat();
}

#[cfg(feature="voice")]
Expand All @@ -1420,40 +1342,39 @@ fn handle_shard(info: &mut MonitorInfo) {
let mut shard = info.shard.lock().unwrap();

let event = match shard.client.recv_json(GatewayEvent::decode) {
Ok(GatewayEvent::HeartbeatAck) => {
last_ack_time = UTC::now().timestamp();

Ok(GatewayEvent::HeartbeatAck)
},
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
// Check that an amount of time at least double the
// heartbeat_interval has passed.
//
// If not, continue on trying to receive messages.
//
// If it has, attempt to auto-reconnect.
let last = shard.last_heartbeat_ack();
let interval = shard.heartbeat_interval();

if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
let interval_in_secs = interval / 1000;

if seconds_passed <= interval_in_secs * 2 {
continue;
}
} else {
continue;
}

debug!("Attempting to shutdown receiver/sender");

match shard.resume() {
Ok(_) => {
debug!("Successfully resumed shard");
debug!("Attempting to auto-reconnect");

last_ack_time = UTC::now().timestamp();
last_heartbeat_sent = UTC::now().timestamp();

continue;
},
Err(why) => {
warn!("Err resuming shard: {:?}", why);

return;
},
if let Err(why) = shard.autoreconnect() {
error!("Failed to auto-reconnect: {:?}", why);
}

continue;
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
other => other,
};

trace!("Received event on shard handler: {:?}", event);

match shard.handle_event(event) {
Ok(Some(event)) => event,
Ok(None) => continue,
Expand Down Expand Up @@ -1482,19 +1403,20 @@ fn handle_shard(info: &mut MonitorInfo) {

fn init_client(token: String) -> Client {
http::set_token(&token);
let locked = Arc::new(Mutex::new(token));

feature_framework! {{
Client {
data: Arc::new(Mutex::new(ShareMap::custom())),
event_store: Arc::new(RwLock::new(EventStore::default())),
framework: Arc::new(Mutex::new(Framework::default())),
token: token,
token: locked,
}
} else {
Client {
data: Arc::new(Mutex::new(ShareMap::custom())),
event_store: Arc::new(RwLock::new(EventStore::default())),
token: token,
token: locked,
}
}}
}
Expand Down

0 comments on commit 601704a

Please sign in to comment.