Skip to content

Commit

Permalink
Upgrade rust-websocket, rust-openssl, and hyper
Browse files Browse the repository at this point in the history
Upgrade `rust-websocket` to v0.20, maintaining use of its sync client.

This indirectly switches from `rust-openssl` v0.7 - which required
openssl-1.0 on all platforms - to `native-tls`, which allows for use of
schannel on Windows, Secure Transport on OSX, and openssl-1.1 on other
platforms.

Additionally, since hyper is no longer even a dependency of
rust-websocket, we can safely and easily upgrade to `hyper` v0.10 and
`multipart` v0.12.

This commit is fairly experimental as it has not been tested on a
long-running bot.
  • Loading branch information
Zeyla Hellyer committed Jun 7, 2017
1 parent 1700a4a commit 8f8a059
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 263 deletions.
18 changes: 14 additions & 4 deletions Cargo.toml
Expand Up @@ -33,7 +33,11 @@ version = "0.2"

[dependencies.hyper]
optional = true
version = "~0.9"
version = "~0.10"

[dependencies.hyper-native-tls]
optional = true
version = "0.2.2"

[dependencies.lazy_static]
optional = true
Expand All @@ -43,7 +47,11 @@ version = "~0.2"
default-features = false
features = ["client", "hyper"]
optional = true
version = "0.8"
version = "0.12"

[dependencies.native-tls]
optional = true
version = "0.1"

[dependencies.opus]
optional = true
Expand All @@ -59,8 +67,10 @@ optional = true
version = "~0.3"

[dependencies.websocket]
default-features = false
features = ["sync-ssl"]
optional = true
version = "~0.17"
version = "~0.20"

[features]
default = [
Expand All @@ -79,7 +89,7 @@ client = ["gateway", "lazy_static", "http", "typemap"]
extras = []
framework = ["client", "model", "utils"]
gateway = ["http", "websocket"]
http = ["hyper", "lazy_static", "multipart"]
http = ["hyper", "hyper-native-tls", "lazy_static", "multipart", "native-tls"]
model = ["builder", "http"]
utils = []
voice = ["byteorder", "gateway", "opus", "sodiumoxide"]
32 changes: 16 additions & 16 deletions src/client/context.rs
Expand Up @@ -134,7 +134,8 @@ impl Context {
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
pub fn online(&self) {
self.shard.lock().unwrap().set_status(OnlineStatus::Online);
let mut shard = self.shard.lock().unwrap();
shard.set_status(OnlineStatus::Online);
}

/// Sets the current user as being [`Idle`]. This maintains the current
Expand All @@ -157,7 +158,8 @@ impl Context {
///
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
pub fn idle(&self) {
self.shard.lock().unwrap().set_status(OnlineStatus::Idle);
let mut shard = self.shard.lock().unwrap();
shard.set_status(OnlineStatus::Idle);
}

/// Sets the current user as being [`DoNotDisturb`]. This maintains the
Expand All @@ -180,7 +182,8 @@ impl Context {
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
pub fn dnd(&self) {
self.shard.lock().unwrap().set_status(OnlineStatus::DoNotDisturb);
let mut shard = self.shard.lock().unwrap();
shard.set_status(OnlineStatus::DoNotDisturb);
}

/// Sets the current user as being [`Invisible`]. This maintains the current
Expand All @@ -203,7 +206,8 @@ impl Context {
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible
pub fn invisible(&self) {
self.shard.lock().unwrap().set_status(OnlineStatus::Invisible);
let mut shard = self.shard.lock().unwrap();
shard.set_status(OnlineStatus::Invisible);
}

/// "Resets" the current user's presence, by setting the game to `None` and
Expand All @@ -228,9 +232,8 @@ impl Context {
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
/// [`set_presence`]: #method.set_presence
pub fn reset_presence(&self) {
self.shard.lock()
.unwrap()
.set_presence(None, OnlineStatus::Online, false)
let mut shard = self.shard.lock().unwrap();
shard.set_presence(None, OnlineStatus::Online, false)
}

/// Sets the current game, defaulting to an online status of [`Online`].
Expand Down Expand Up @@ -260,9 +263,8 @@ impl Context {
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
pub fn set_game(&self, game: Game) {
self.shard.lock()
.unwrap()
.set_presence(Some(game), OnlineStatus::Online, false);
let mut shard = self.shard.lock().unwrap();
shard.set_presence(Some(game), OnlineStatus::Online, false);
}

/// Sets the current game, passing in only its name. This will automatically
Expand Down Expand Up @@ -302,9 +304,8 @@ impl Context {
url: None,
};

self.shard.lock()
.unwrap()
.set_presence(Some(game), OnlineStatus::Online, false);
let mut shard = self.shard.lock().unwrap();
shard.set_presence(Some(game), OnlineStatus::Online, false);
}

/// Sets the current user's presence, providing all fields to be passed.
Expand Down Expand Up @@ -351,8 +352,7 @@ impl Context {
game: Option<Game>,
status: OnlineStatus,
afk: bool) {
self.shard.lock()
.unwrap()
.set_presence(game, status, afk)
let mut shard = self.shard.lock().unwrap();
shard.set_presence(game, status, afk)
}
}
81 changes: 53 additions & 28 deletions src/client/mod.rs
Expand Up @@ -35,6 +35,7 @@ pub use ::http as rest;
#[cfg(feature="cache")]
pub use ::CACHE;

use chrono::UTC;
use self::dispatch::dispatch;
use self::event_store::EventStore;
use std::collections::HashMap;
Expand All @@ -43,9 +44,7 @@ use std::time::Duration;
use std::{mem, thread};
use super::gateway::Shard;
use typemap::ShareMap;
use websocket::client::Receiver;
use websocket::result::WebSocketError;
use websocket::stream::WebSocketStream;
use ::http;
use ::internal::prelude::*;
use ::internal::ws_impl::ReceiverExt;
Expand Down Expand Up @@ -982,7 +981,7 @@ impl Client {
});

match boot {
Ok((shard, ready, receiver)) => {
Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
Expand Down Expand Up @@ -1011,7 +1010,6 @@ impl Client {
event_store: self.event_store.clone(),
framework: self.framework.clone(),
gateway_url: gateway_url.clone(),
receiver: receiver,
shard: shard,
shard_info: shard_info,
token: self.token.clone(),
Expand All @@ -1021,7 +1019,6 @@ impl Client {
data: self.data.clone(),
event_store: self.event_store.clone(),
gateway_url: gateway_url.clone(),
receiver: receiver,
shard: shard,
shard_info: shard_info,
token: self.token.clone(),
Expand Down Expand Up @@ -1254,7 +1251,6 @@ struct MonitorInfo {
event_store: Arc<RwLock<EventStore>>,
framework: Arc<Mutex<Framework>>,
gateway_url: Arc<Mutex<String>>,
receiver: Receiver<WebSocketStream>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
Expand All @@ -1265,13 +1261,12 @@ struct MonitorInfo {
data: Arc<Mutex<ShareMap>>,
event_store: Arc<RwLock<EventStore>>,
gateway_url: Arc<Mutex<String>>,
receiver: Receiver<WebSocketStream>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
}

fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> {
fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
// Make ten attempts to boot the shard, exponentially backing off; if it
// still doesn't boot after that, accept it as a failure.
//
Expand All @@ -1298,7 +1293,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS
info.shard_info);

match attempt {
Ok((shard, ready, receiver)) => {
Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
Expand All @@ -1308,7 +1303,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS

info!("Successfully booted shard: {:?}", info.shard_info);

return Ok((shard, ready, receiver));
return Ok((shard, ready));
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
}
Expand All @@ -1332,14 +1327,13 @@ fn monitor_shard(mut info: MonitorInfo) {
});

match boot {
Ok((new_shard, ready, new_receiver)) => {
Ok((new_shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write().unwrap().update_with_ready(&ready);
}

*info.shard.lock().unwrap() = new_shard;
info.receiver = new_receiver;

boot_successful = true;

Expand Down Expand Up @@ -1375,16 +1369,54 @@ fn monitor_shard(mut info: MonitorInfo) {
}

fn handle_shard(info: &mut MonitorInfo) {
// This is currently all ducktape. Redo this.
let mut last_ack_time = UTC::now().timestamp();
let mut last_heartbeat_sent = UTC::now().timestamp();

loop {
let event = match info.receiver.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
debug!("Attempting to shutdown receiver/sender");
let mut shard = info.shard.lock().unwrap();
let in_secs = shard.heartbeat_interval() / 1000;

match info.shard.lock().unwrap().resume(&mut info.receiver) {
Ok((_, receiver)) => {
if UTC::now().timestamp() - last_heartbeat_sent > in_secs {
// If the last heartbeat didn't receive an acknowledgement, then
// shutdown and auto-reconnect.
if !shard.last_heartbeat_acknowledged() {
debug!("Last heartbeat not acknowledged; re-connecting");

match shard.resume() {
Ok(_) => {
debug!("Successfully resumed shard");

info.receiver = receiver;
continue;
},
Err(why) => {
warn!("Err resuming shard: {:?}", why);

return;
},
}
}

let _ = shard.heartbeat();
last_heartbeat_sent = UTC::now().timestamp();
}

let event = match shard.client.recv_json(GatewayEvent::decode) {
Ok(GatewayEvent::HeartbeatAck) => {
last_ack_time = UTC::now().timestamp();

Ok(GatewayEvent::HeartbeatAck)
},
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
continue;
}

debug!("Attempting to shutdown receiver/sender");

match shard.resume() {
Ok(_) => {
debug!("Successfully resumed shard");

continue;
},
Expand All @@ -1395,21 +1427,14 @@ fn handle_shard(info: &mut MonitorInfo) {
},
}
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
other => other,
};

trace!("Received event on shard handler: {:?}", event);

// This will only lock when _updating_ the shard, resuming, etc. Most
// of the time, this won't be locked (i.e. when receiving an event over
// the receiver, separate from the shard itself).
let event = match info.shard.lock().unwrap().handle_event(event, &mut info.receiver) {
Ok(Some((event, Some(new_receiver)))) => {
info.receiver = new_receiver;

event
},
Ok(Some((event, None))) => event,
let event = match shard.handle_event(event) {
Ok(Some(event)) => event,
Ok(None) => continue,
Err(why) => {
error!("Shard handler received err: {:?}", why);
Expand Down
14 changes: 14 additions & 0 deletions src/error.rs
Expand Up @@ -8,6 +8,8 @@ use ::model::ModelError;

#[cfg(feature="hyper")]
use hyper::Error as HyperError;
#[cfg(feature="native-tls")]
use native_tls::Error as TlsError;
#[cfg(feature="voice")]
use opus::Error as OpusError;
#[cfg(feature="websocket")]
Expand Down Expand Up @@ -83,6 +85,9 @@ pub enum Error {
/// An error from the `hyper` crate.
#[cfg(feature="hyper")]
Hyper(HyperError),
/// An error from the `native-tls` crate.
#[cfg(feature="native-tls")]
Tls(TlsError),
/// An error from the `rust-websocket` crate.
#[cfg(feature="gateway")]
WebSocket(WebSocketError),
Expand Down Expand Up @@ -141,6 +146,13 @@ impl From<OpusError> for Error {
}
}

#[cfg(feature="native-tls")]
impl From<TlsError> for Error {
fn from(e: TlsError) -> Error {
Error::Tls(e)
}
}

#[cfg(feature="gateway")]
impl From<WebSocketError> for Error {
fn from(e: WebSocketError) -> Error {
Expand Down Expand Up @@ -184,6 +196,8 @@ impl StdError for Error {
Error::Hyper(ref inner) => inner.description(),
#[cfg(feature="voice")]
Error::Opus(ref inner) => inner.description(),
#[cfg(feature="native-tls")]
Error::Tls(ref inner) => inner.description(),
#[cfg(feature="voice")]
Error::Voice(_) => "Voice error",
#[cfg(feature="gateway")]
Expand Down
8 changes: 6 additions & 2 deletions src/gateway/error.rs
@@ -1,5 +1,6 @@
use std::error::Error as StdError;
use std::fmt::{self, Display};
use websocket::message::CloseData;

/// An error that occurred while attempting to deal with the gateway.
///
Expand All @@ -10,9 +11,11 @@ pub enum Error {
/// There was an error building a URL.
BuildingUrl,
/// The connection closed, potentially uncleanly.
Closed(Option<u16>, String),
Closed(Option<CloseData>),
/// Expected a Hello during a handshake
ExpectedHello,
/// When there was an error sending a heartbeat.
HeartbeatFailed,
/// Expected a Ready or an InvalidateSession
InvalidHandshake,
/// An indicator that an unknown opcode was received from the gateway.
Expand All @@ -33,8 +36,9 @@ impl StdError for Error {
fn description(&self) -> &str {
match *self {
Error::BuildingUrl => "Error building url",
Error::Closed(_, _) => "Connection closed",
Error::Closed(_) => "Connection closed",
Error::ExpectedHello => "Expected a Hello",
Error::HeartbeatFailed => "Failed sending a heartbeat",
Error::InvalidHandshake => "Expected a valid Handshake",
Error::InvalidOpCode => "Invalid OpCode",
Error::NoSessionId => "No Session Id present when required",
Expand Down

0 comments on commit 8f8a059

Please sign in to comment.