-
Notifications
You must be signed in to change notification settings - Fork 285
/
websockets.rs
104 lines (92 loc) · 3.95 KB
/
websockets.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use model::*;
use errors::*;
use url::Url;
use serde_json::from_str;
use tungstenite::{connect, Message};
use tungstenite::protocol::WebSocket;
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
static WEBSOCKET_URL: &'static str = "wss://stream.binance.com:9443/ws/";
static OUTBOUND_ACCOUNT_INFO: &'static str = "outboundAccountInfo";
static EXECUTION_REPORT: &'static str = "executionReport";
static KLINE: &'static str = "kline";
static AGGREGATED_TRADE: &'static str = "aggTrade";
static DEPTH_ORDERBOOK : &'static str = "depthUpdate";
static PARTIAL_ORDERBOOK : &'static str = "lastUpdateId";
static DAYTICKER: &'static str = "24hrTicker";
pub enum WebsocketEvent {
AccountUpdate(AccountUpdateEvent),
OrderTrade(OrderTradeEvent),
Trade(TradesEvent),
OrderBook(OrderBook),
DayTicker(Vec<DayTickerEvent>),
Kline(KlineEvent),
DepthOrderBook(DepthOrderBookEvent),
}
pub struct WebSockets<'a> {
socket: Option<(WebSocket<AutoStream>, Response)>,
handler: Box<FnMut(WebsocketEvent) + 'a>,
}
impl<'a> WebSockets<'a> {
pub fn new<Callback>(handler: Callback) -> WebSockets<'a>
where
Callback: FnMut(WebsocketEvent) + 'a
{
WebSockets {
socket: None,
handler: Box::new(handler),
}
}
pub fn connect(&mut self, endpoint: &str) -> Result<()> {
let wss: String = format!("{}{}", WEBSOCKET_URL, endpoint);
let url = Url::parse(&wss)?;
match connect(url) {
Ok(answer) => {
self.socket = Some(answer);
Ok(())
}
Err(e) => {
bail!(format!("Error during handshake {}", e));
}
}
}
pub fn event_loop(&mut self) -> Result<()> {
loop {
if let Some(ref mut socket) = self.socket {
let message = socket.0.read_message()?;
match message {
Message::Text(msg) => {
if msg.find(OUTBOUND_ACCOUNT_INFO) != None {
let account_update: AccountUpdateEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::AccountUpdate(account_update));
} else if msg.find(EXECUTION_REPORT) != None {
let order_trade: OrderTradeEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::OrderTrade(order_trade));
} else if msg.find(AGGREGATED_TRADE) != None {
let trade: TradesEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::Trade(trade));
} else if msg.find(DAYTICKER) != None {
let trades: Vec<DayTickerEvent> = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::DayTicker(trades));
} else if msg.find(KLINE) != None {
let kline: KlineEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::Kline(kline));
} else if msg.find(PARTIAL_ORDERBOOK) != None {
let partial_orderbook: OrderBook = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::OrderBook(partial_orderbook));
} else if msg.find(DEPTH_ORDERBOOK) != None {
let depth_orderbook: DepthOrderBookEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::DepthOrderBook(depth_orderbook));
}
}
Message::Ping(_) |
Message::Pong(_) |
Message::Binary(_) => {}
Message::Close(e) => {
bail!(format!("Disconnected {:?}", e));
}
}
}
}
}
}