From fc4ebca88f9334928e4e230d3fe967676a089351 Mon Sep 17 00:00:00 2001 From: Flavio Oliveira Date: Sun, 30 Jun 2019 11:52:22 +0200 Subject: [PATCH] Improve websockets error handling --- Cargo.toml | 6 +- README.md | 24 +++++- examples/Cargo.toml | 20 ----- examples/README.md | 6 +- examples/{src => }/binance_endpoints.rs | 0 examples/{src => }/binance_save_all_trades.rs | 8 +- examples/{src => }/binance_websockets.rs | 73 ++++++++++++++++--- src/errors.rs | 7 +- src/websockets.rs | 69 +++++++++--------- 9 files changed, 136 insertions(+), 77 deletions(-) delete mode 100644 examples/Cargo.toml rename examples/{src => }/binance_endpoints.rs (100%) rename examples/{src => }/binance_save_all_trades.rs (89%) rename examples/{src => }/binance_websockets.rs (70%) diff --git a/Cargo.toml b/Cargo.toml index db139c6e..af92d600 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "binance" -version = "0.5.0" +version = "0.6.0" license = "MIT OR Apache-2.0" authors = ["Flavio Oliveira "] @@ -22,8 +22,8 @@ hex = "0.3" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -error-chain = "0.12" +error-chain = { version = "0.12", default-features = false } ring = "0.14" reqwest = "0.9" -tungstenite = "0.6" +tungstenite = "0.8" url = "1.7" diff --git a/README.md b/README.md index 86bf9767..54c17ac0 100644 --- a/README.md +++ b/README.md @@ -248,7 +248,13 @@ if let Ok(answer) = user_stream.start() { }); web_socket.connect(&listen_key).unwrap(); // check error - web_socket.event_loop(); + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } } else { println!("Not able to start an User Stream (Check your API_KEY)"); } @@ -278,7 +284,13 @@ let mut web_socket: WebSockets = WebSockets::new(|event: WebsocketEvent| { }); web_socket.connect(&agg_trade).unwrap(); // check error -web_socket.event_loop(); +if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } +} ``` ### WEBSOCKETS - KLINE @@ -303,7 +315,13 @@ let mut web_socket: WebSockets = WebSockets::new(|event: WebsocketEvent| { }); web_socket.connect(&kline).unwrap(); // check error -web_socket.event_loop(); +if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } +} ``` ## Other Exchanges diff --git a/examples/Cargo.toml b/examples/Cargo.toml deleted file mode 100644 index cbd8237d..00000000 --- a/examples/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "binance-app" -version = "0.5.0" -authors = ["Flavio Oliveira "] - -[[bin]] -name = "binance_endpoints" -path = "src/binance_endpoints.rs" - -[[bin]] -name = "binance_websockets" -path = "src/binance_websockets.rs" - -[[bin]] -name = "binance_save_all_trades" -path = "src/binance_save_all_trades.rs" - -[dependencies] -binance = { path = "../" } -csv = "1.0.0" \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 65d914e0..15026b72 100644 --- a/examples/README.md +++ b/examples/README.md @@ -2,12 +2,12 @@ ## Binance Endpoints -cargo run --release --bin "binance_endpoints" +cargo run --release --example "binance_endpoints" ## Binance Websockets -cargo run --release --bin "binance_websockets" +cargo run --release --example "binance_websockets" ## Binance Websockets - Save all trades to file -cargo run --release --bin "binance_save_all_trades" \ No newline at end of file +cargo run --release --example "binance_save_all_trades" diff --git a/examples/src/binance_endpoints.rs b/examples/binance_endpoints.rs similarity index 100% rename from examples/src/binance_endpoints.rs rename to examples/binance_endpoints.rs diff --git a/examples/src/binance_save_all_trades.rs b/examples/binance_save_all_trades.rs similarity index 89% rename from examples/src/binance_save_all_trades.rs rename to examples/binance_save_all_trades.rs index 795e53bb..e3b8ae3c 100644 --- a/examples/src/binance_save_all_trades.rs +++ b/examples/binance_save_all_trades.rs @@ -49,5 +49,11 @@ fn save_all_trades_websocket() { }); web_socket.connect(&agg_trade).unwrap(); // check error - web_socket.event_loop(); + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } } \ No newline at end of file diff --git a/examples/src/binance_websockets.rs b/examples/binance_websockets.rs similarity index 70% rename from examples/src/binance_websockets.rs rename to examples/binance_websockets.rs index b74831b6..69e7905a 100644 --- a/examples/src/binance_websockets.rs +++ b/examples/binance_websockets.rs @@ -5,11 +5,12 @@ use binance::userstream::*; use binance::websockets::*; fn main() { - user_stream(); - user_stream_websocket(); - market_websocket(); - kline_websocket(); - all_trades_websocket(); + //user_stream(); + //user_stream_websocket(); + //market_websocket(); + //kline_websocket(); + //all_trades_websocket(); + last_price(); } fn user_stream() { @@ -62,7 +63,13 @@ fn user_stream_websocket() { }); web_socket.connect(&listen_key).unwrap(); // check error - web_socket.event_loop(); + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } } else { println!("Not able to start an User Stream (Check your API_KEY)"); } @@ -95,7 +102,13 @@ fn market_websocket() { }); web_socket.connect(&agg_trade).unwrap(); // check error - web_socket.event_loop(); + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } } fn all_trades_websocket() { @@ -115,7 +128,13 @@ fn all_trades_websocket() { }); web_socket.connect(&agg_trade).unwrap(); // check error - web_socket.event_loop(); + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } } fn kline_websocket() { @@ -133,5 +152,41 @@ fn kline_websocket() { }); web_socket.connect(&kline).unwrap(); // check error - web_socket.event_loop(); + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } } + +fn last_price() { + let agg_trade: String = format!("!ticker@arr"); + let mut btcusdt: f32 = "0".parse().unwrap(); + + let mut web_socket: WebSockets = WebSockets::new(|event: WebsocketEvent| { + match event { + WebsocketEvent::DayTicker(ticker_events) => { + for tick_event in ticker_events { + if tick_event.symbol == "BTCUSDT" { + btcusdt = tick_event.average_price.parse().unwrap(); + let btcusdt_close: f32 = tick_event.current_close.parse().unwrap(); + println!("{} - {}", btcusdt, btcusdt_close); + } + } + }, + _ => return, + } + }); + + web_socket.connect(&agg_trade).unwrap(); // check error + + if let Err(e) = web_socket.event_loop() { + match e { + err => { + println!("Error: {}", err); + } + } + } +} \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs index 1ae4493b..5306472c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,12 +2,9 @@ use std; use reqwest; use url; use serde_json; +use tungstenite; error_chain! { - types { - Error, ErrorKind, ResultExt, Result; - } - errors { BinanceError(code: i16, msg: String, response: reqwest::Response) } @@ -19,7 +16,7 @@ error_chain! { ParseFloatError(std::num::ParseFloatError); UrlParserError(url::ParseError); Json(serde_json::Error); + Tungstenite(tungstenite::Error); TimestampError(std::time::SystemTimeError); } - } diff --git a/src/websockets.rs b/src/websockets.rs index b354b5d9..fbf197a9 100644 --- a/src/websockets.rs +++ b/src/websockets.rs @@ -3,7 +3,7 @@ use errors::*; use url::Url; use serde_json::from_str; -use tungstenite::connect; +use tungstenite::{connect, Message}; use tungstenite::protocol::WebSocket; use tungstenite::client::AutoStream; use tungstenite::handshake::client::Response; @@ -61,40 +61,43 @@ impl<'a> WebSockets<'a> { } } - pub fn event_loop(&mut self) { + pub fn event_loop(&mut self) -> Result<()> { loop { if let Some(ref mut socket) = self.socket { - let msg: String = socket.0.read_message().unwrap().into_text().unwrap(); - - if msg.find(OUTBOUND_ACCOUNT_INFO) != None { - let account_update: AccountUpdateEvent = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::AccountUpdate(account_update)); - } else if msg.find(EXECUTION_REPORT) != None { - let order_trade: OrderTradeEvent = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::OrderTrade(order_trade)); - } else if msg.find(AGGREGATED_TRADE) != None { - let trade: TradesEvent = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::Trade(trade)); - } else if msg.find(DAYTICKER) != None { - let trades: Vec = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::DayTicker(trades)); - } else if msg.find(KLINE) != None { - let kline: KlineEvent = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::Kline(kline)); - } else if msg.find(PARTIAL_ORDERBOOK) != None { - let partial_orderbook: OrderBook = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::OrderBook(partial_orderbook)); - } else if msg.find(DEPTH_ORDERBOOK) != None { - let depth_orderbook: DepthOrderBookEvent = from_str(msg.as_str()).unwrap(); - - (self.handler)(WebsocketEvent::DepthOrderBook(depth_orderbook)); - } + let message = socket.0.read_message()?; + + match message { + Message::Text(msg) => { + if msg.find(OUTBOUND_ACCOUNT_INFO) != None { + let account_update: AccountUpdateEvent = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::AccountUpdate(account_update)); + } else if msg.find(EXECUTION_REPORT) != None { + let order_trade: OrderTradeEvent = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::OrderTrade(order_trade)); + } else if msg.find(AGGREGATED_TRADE) != None { + let trade: TradesEvent = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::Trade(trade)); + } else if msg.find(DAYTICKER) != None { + let trades: Vec = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::DayTicker(trades)); + } else if msg.find(KLINE) != None { + let kline: KlineEvent = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::Kline(kline)); + } else if msg.find(PARTIAL_ORDERBOOK) != None { + let partial_orderbook: OrderBook = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::OrderBook(partial_orderbook)); + } else if msg.find(DEPTH_ORDERBOOK) != None { + let depth_orderbook: DepthOrderBookEvent = from_str(msg.as_str())?; + (self.handler)(WebsocketEvent::DepthOrderBook(depth_orderbook)); + } + } + Message::Ping(_) | + Message::Pong(_) | + Message::Binary(_) => {} + Message::Close(e) => { + bail!(format!("Disconnected {:?}", e)); + } + } } } }