diff --git a/src/client/error.rs b/src/client/error.rs index 14818d9f2e4..39755761635 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -122,6 +122,7 @@ pub enum Error { /// /// [`Context::edit_role`]: struct.Context.html#method.edit_role RecordNotFound, + ShardBootFailure, /// When the shard being retrieved from within the Client could not be /// found after being inserted into the Client's internal vector of /// [`Shard`]s. diff --git a/src/client/gateway/prep.rs b/src/client/gateway/prep.rs index b1c08bb12b7..4aa551989dd 100644 --- a/src/client/gateway/prep.rs +++ b/src/client/gateway/prep.rs @@ -1,6 +1,5 @@ use serde_json::builder::ObjectBuilder; use serde_json::Value; -use std::net::Shutdown; use std::sync::mpsc::{ Receiver as MpscReceiver, Sender as MpscSender, @@ -96,6 +95,7 @@ pub fn keepalive(interval: u64, let mut next_tick = time::get_time() + base_interval; let mut last_sequence = 0; + let mut last_successful = false; 'outer: loop { thread::sleep(StdDuration::from_millis(100)); @@ -137,12 +137,25 @@ pub fn keepalive(interval: u64, *heartbeat_sent.lock().unwrap() = now; }, - Err(why) => warn!("Error sending keepalive: {:?}", why), + Err(why) => { + warn!("Error sending keepalive: {:?}", why); + + if last_successful { + debug!("If next keepalive fails, closing"); + } else { + break; + } + + last_successful = false; + }, } } } debug!("Closing keepalive"); - let _ = sender.get_mut().shutdown(Shutdown::Both); + match sender.shutdown_all() { + Ok(_) => debug!("Successfully shutdown sender/receiver"), + Err(why) => warn!("Failed to shutdown sender/receiver: {:?}", why), + } } diff --git a/src/client/mod.rs b/src/client/mod.rs index f4f113537e0..3d712a56e24 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -43,17 +43,18 @@ use std::thread; use std::time::Duration; use typemap::ShareMap; use websocket::client::Receiver; +use websocket::result::WebSocketError; use websocket::stream::WebSocketStream; use ::internal::prelude::{Error, Result, Value}; use ::internal::ws_impl::ReceiverExt; use ::model::event::{ ChannelPinsAckEvent, ChannelPinsUpdateEvent, - Event, GatewayEvent, GuildSyncEvent, MessageUpdateEvent, PresenceUpdateEvent, + ReadyEvent, ResumedEvent, TypingStartEvent, VoiceServerUpdateEvent, @@ -177,7 +178,6 @@ pub struct Client { #[cfg(feature="framework")] framework: Arc>, login_type: LoginType, - shards: Vec>>, token: String, } @@ -780,74 +780,65 @@ impl Client { .update_current_user(user.id, user.bot); } - let gateway_url = rest::get_gateway()?.url; + let gateway_url = Arc::new(Mutex::new(rest::get_gateway()?.url)); - for i in shard_data.map_or(0, |x| x[0])..shard_data.map_or(1, |x| x[1] + 1) { - let shard = Shard::new(&gateway_url, - &self.token, - shard_data.map(|s| [i, s[2]]), - self.login_type); - match shard { - Ok((shard, ready, receiver)) => { - self.shards.push(Arc::new(Mutex::new(shard))); + let shards_index = shard_data.map_or(0, |x| x[0]); + let shards_total = shard_data.map_or(1, |x| x[1] + 1); + for shard_number in shards_index..shards_total { + let shard_info = shard_data.map(|s| [shard_number, s[2]]); + + let boot = boot_shard(BootInfo { + gateway_url: gateway_url.clone(), + login_type: self.login_type, + shard_info: shard_info, + token: self.token.clone(), + }); + + match boot { + Ok((shard, _ready, receiver)) => { #[cfg(feature="cache")] { CACHE.write() .unwrap() - .update_with_ready(&ready); + .update_with_ready(&_ready); } - match self.shards.last() { - Some(shard) => { - feature_framework! {{ - dispatch(Event::Ready(ready), - shard.clone(), - self.framework.clone(), - self.data.clone(), - self.login_type, - self.event_store.clone()); - } else { - dispatch(Event::Ready(ready), - shard.clone(), - self.data.clone(), - self.login_type, - self.event_store.clone()); - }} - - let data_clone = self.data.clone(); - let event_store = self.event_store.clone(); - let login_type = self.login_type; - let shard_clone = shard.clone(); - - feature_framework! {{ - let framework = self.framework.clone(); - - thread::spawn(move || { - handle_shard(shard_clone, - framework, - data_clone, - login_type, - event_store, - receiver) - }); - } else { - thread::spawn(move || { - handle_shard(shard_clone, - data_clone, - login_type, - event_store, - receiver) - }); - }} - }, - None => return Err(Error::Client(ClientError::ShardUnknown)), - } + let monitor_info = feature_framework! {{ + MonitorInfo { + data: self.data.clone(), + event_store: self.event_store.clone(), + framework: self.framework.clone(), + gateway_url: gateway_url.clone(), + login_type: self.login_type, + receiver: receiver, + shard: Arc::new(Mutex::new(shard)), + shard_info: shard_info, + token: self.token.clone(), + } + } else { + MonitorInfo { + data: self.data.clone(), + event_store: self.event_store.clone(), + gateway_url: gateway_url.clone(), + login_type: self.login_type, + receiver: receiver, + shard: Arc::new(Mutex::new(shard)), + shard_info: shard_info, + token: self.token.clone(), + } + }}; + + thread::spawn(move || { + monitor_shard(monitor_info); + }); }, - Err(why) => return Err(why), + Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why), } // Wait 5 seconds between shard boots. + // + // We need to wait at least 5 seconds between READYs. thread::sleep(Duration::from_secs(5)); } @@ -1161,61 +1152,152 @@ impl Client { } } +pub struct BootInfo { + gateway_url: Arc>, + login_type: LoginType, + shard_info: Option<[u64; 2]>, + token: String, +} + #[cfg(feature="framework")] -fn handle_shard(shard: Arc>, - framework: Arc>, - data: Arc>, - login_type: LoginType, - event_store: Arc>, - mut receiver: Receiver) { - loop { - let event = receiver.recv_json(GatewayEvent::decode); +pub struct MonitorInfo { + data: Arc>, + event_store: Arc>, + framework: Arc>, + gateway_url: Arc>, + login_type: LoginType, + receiver: Receiver, + shard: Arc>, + shard_info: Option<[u64; 2]>, + token: String, +} - trace!("Received event on shard handler: {:?}", event); +#[cfg(not(feature="framework"))] +pub struct MonitorInfo { + data: Arc>, + event_store: Arc>, + gateway_url: Arc>, + login_type: LoginType, + receiver: Receiver, + shard: Arc>, + shard_info: Option<[u64; 2]>, + token: String, +} - // This will only lock when _updating_ the shard, resuming, etc. Most - // of the time, this won't be locked (i.e. when receiving an event over - // the receiver, separate from the shard itself). - let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { - Ok(Some((event, Some(new_receiver)))) => { - receiver = new_receiver; +fn boot_shard(info: BootInfo) -> Result<(Shard, ReadyEvent, Receiver)> { + // Make ten attempts to boot the shard, exponentially backing off; if it + // still doesn't boot after that, accept it as a failure. + // + // After three attempts, start re-retrieving the gateway URL. Before that, + // use the cached one. + for attempt_number in 1..11u64 { + // If we've tried over 3 times so far, get a new gateway URL. + // + // If doing so fails, count this as a boot attempt. + if attempt_number > 3 { + match rest::get_gateway() { + Ok(g) => *info.gateway_url.lock().unwrap() = g.url, + Err(why) => { + warn!("Failed to retrieve gateway URL: {:?}", why); + + // Failed -- start over. + continue; + }, + } + } - event - }, - Ok(Some((event, None))) => event, - Ok(None) => continue, - Err(why) => { - // This is potentially causing problems -- let's see. - error!("Shard handler received err: {:?}", why); + let attempt = Shard::new(&info.gateway_url.lock().unwrap(), + &info.token, + info.shard_info, + info.login_type); - continue; + match attempt { + Ok((shard, ready, receiver)) => { + #[cfg(feature="cache")] + { + CACHE.write() + .unwrap() + .update_with_ready(&ready); + } + + info!("Successfully booted shard: {:?}", info.shard_info); + + return Ok((shard, ready, receiver)); }, - }; + Err(why) => warn!("Failed to boot shard: {:?}", why), + } + } + + // Hopefully _never_ happens? + Err(Error::Client(ClientError::ShardBootFailure)) +} + +fn monitor_shard(mut info: MonitorInfo) { + loop { + let mut boot_successful = false; + + for _ in 0..3 { + let boot = boot_shard(BootInfo { + gateway_url: info.gateway_url.clone(), + login_type: info.login_type, + shard_info: info.shard_info, + token: info.token.clone(), + }); + + match boot { + Ok((new_shard, _ready, new_receiver)) => { + #[cfg(feature="cache")] + { + CACHE.write().unwrap().update_with_ready(&_ready); + } + + *info.shard.lock().unwrap() = new_shard; + info.receiver = new_receiver; - dispatch(event, - shard.clone(), - framework.clone(), - data.clone(), - login_type, - event_store.clone()); + boot_successful = true; + + break; + }, + Err(why) => warn!("Failed to boot shard: {:?}", why), + } + } + + if boot_successful { + handle_shard(&mut info); + } else { + break; + } + + // The shard died: redo the cycle. } + + error!("Completely failed to reboot shard"); } -#[cfg(not(feature="framework"))] -// aaaaaaaaaaaaaaaaaaaaaaaaaaaaa -fn handle_shard(shard: Arc>, - data: Arc>, - login_type: LoginType, - event_store: Arc>, - mut receiver: Receiver) { +fn handle_shard(info: &mut MonitorInfo) { loop { - let event = receiver.recv_json(GatewayEvent::decode); + let event = match info.receiver.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + debug!("Attempting to shutdown receiver/sender"); + + match info.receiver.shutdown_all() { + Ok(_) => debug!("Successfully shutdown receiver/sender"), + Err(why) => warn!("Err shutting down receiver/sender: {:?}", why), + } + + return; + }, + other => other, + }; trace!("Received event on shard handler: {:?}", event); - let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { + // This will only lock when _updating_ the shard, resuming, etc. Most + // of the time, this won't be locked (i.e. when receiving an event over + // the receiver, separate from the shard itself). + let event = match info.shard.lock().unwrap().handle_event(event, &mut info.receiver) { Ok(Some((event, Some(new_receiver)))) => { - receiver = new_receiver; + info.receiver = new_receiver; event }, @@ -1228,11 +1310,20 @@ fn handle_shard(shard: Arc>, }, }; - dispatch(event, - shard.clone(), - data.clone(), - login_type, - event_store.clone()); + feature_framework! {{ + dispatch(event, + info.shard.clone(), + info.framework.clone(), + info.data.clone(), + info.login_type, + info.event_store.clone()); + } else { + dispatch(event, + info.shard.clone(), + info.data.clone(), + info.login_type, + info.event_store.clone()); + }} } } @@ -1247,7 +1338,6 @@ fn login(token: &str, login_type: LoginType) -> Client { event_store: Arc::new(RwLock::new(EventStore::default())), framework: Arc::new(Mutex::new(Framework::default())), login_type: login_type, - shards: Vec::default(), token: token.to_owned(), } } else { @@ -1255,7 +1345,6 @@ fn login(token: &str, login_type: LoginType) -> Client { data: Arc::new(Mutex::new(ShareMap::custom())), event_store: Arc::new(RwLock::new(EventStore::default())), login_type: login_type, - shards: Vec::default(), token: token.to_owned(), } }}