-
Notifications
You must be signed in to change notification settings - Fork 565
/
prep.rs
137 lines (119 loc) · 4.55 KB
/
prep.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use serde_json::builder::ObjectBuilder;
use serde_json::Value;
use std::net::Shutdown;
use std::sync::mpsc::{
TryRecvError,
Receiver as MpscReceiver,
Sender as MpscSender
};
use std::time::Duration as StdDuration;
use std::{env, thread};
use super::super::ClientError;
use super::{GatewayError, GatewayStatus};
use time::{self, Duration};
use websocket::client::request::Url as RequestUrl;
use websocket::client::{Receiver, Sender};
use websocket::stream::WebSocketStream;
use ::constants::{self, OpCode};
use ::error::{Error, Result};
use ::internal::ws_impl::{ReceiverExt, SenderExt};
use ::model::event::{Event, GatewayEvent, ReadyEvent};
#[inline]
pub fn parse_ready(event: GatewayEvent,
tx: &MpscSender<GatewayStatus>,
receiver: &mut Receiver<WebSocketStream>,
identification: Value)
-> Result<(ReadyEvent, u64)> {
match event {
GatewayEvent::Dispatch(seq, Event::Ready(event)) => {
Ok((event, seq))
},
GatewayEvent::InvalidateSession => {
debug!("Session invalidation");
let _ = tx.send(GatewayStatus::SendMessage(identification));
match receiver.recv_json(GatewayEvent::decode)? {
GatewayEvent::Dispatch(seq, Event::Ready(event)) => {
Ok((event, seq))
},
other => {
debug!("Unexpected event: {:?}", other);
Err(Error::Gateway(GatewayError::InvalidHandshake))
},
}
},
other => {
debug!("Unexpected event: {:?}", other);
Err(Error::Gateway(GatewayError::InvalidHandshake))
},
}
}
pub fn identify(token: &str, shard_info: Option<[u8; 2]>) -> Value {
ObjectBuilder::new()
.insert("op", OpCode::Identify.num())
.insert_object("d", |mut object| {
object = identify_compression(object)
.insert("large_threshold", 250) // max value
.insert_object("properties", |object| object
.insert("$browser", "Ergonomic and high-level Rust library")
.insert("$device", "serenity")
.insert("$os", env::consts::OS))
.insert("token", token)
.insert("v", constants::GATEWAY_VERSION);
if let Some(shard_info) = shard_info {
object = object.insert_array("shard", |a| a
.push(shard_info[0])
.push(shard_info[1]));
}
object
})
.build()
}
#[inline(always)]
pub fn identify_compression(object: ObjectBuilder) -> ObjectBuilder {
object.insert("compression", !cfg!(feature="debug"))
}
pub fn build_gateway_url(base: &str) -> Result<RequestUrl> {
RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION))
.map_err(|_| Error::Client(ClientError::Gateway))
}
pub fn keepalive(interval: u64,
mut sender: Sender<WebSocketStream>,
channel: MpscReceiver<GatewayStatus>) {
let mut base_interval = Duration::milliseconds(interval as i64);
let mut next_tick = time::get_time() + base_interval;
let mut last_sequence = 0;
'outer: loop {
thread::sleep(StdDuration::from_millis(100));
loop {
match channel.try_recv() {
Ok(GatewayStatus::ChangeInterval(interval)) => {
base_interval = Duration::milliseconds(interval as i64);
},
Ok(GatewayStatus::ChangeSender(new_sender)) => {
sender = new_sender;
},
Ok(GatewayStatus::SendMessage(val)) => {
if let Err(why) = sender.send_json(&val) {
warn!("Error sending message: {:?}", why);
}
},
Ok(GatewayStatus::Sequence(seq)) => {
last_sequence = seq;
},
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => break 'outer,
}
}
if time::get_time() >= next_tick {
next_tick = next_tick + base_interval;
let map = ObjectBuilder::new()
.insert("d", last_sequence)
.insert("op", OpCode::Heartbeat.num())
.build();
if let Err(why) = sender.send_json(&map) {
warn!("Error sending keepalive: {:?}", why);
}
}
}
let _ = sender.get_mut().shutdown(Shutdown::Both);
}