Skip to content

Commit

Permalink
Improve websockets error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Flavio Oliveira committed Jun 30, 2019
1 parent de0b97c commit fc4ebca
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 77 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "binance"
version = "0.5.0"
version = "0.6.0"
license = "MIT OR Apache-2.0"
authors = ["Flavio Oliveira <flavio@wisespace.io>"]

Expand All @@ -22,8 +22,8 @@ hex = "0.3"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
error-chain = "0.12"
error-chain = { version = "0.12", default-features = false }
ring = "0.14"
reqwest = "0.9"
tungstenite = "0.6"
tungstenite = "0.8"
url = "1.7"
24 changes: 21 additions & 3 deletions README.md
Expand Up @@ -248,7 +248,13 @@ if let Ok(answer) = user_stream.start() {
});

web_socket.connect(&listen_key).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
} else {
println!("Not able to start an User Stream (Check your API_KEY)");
}
Expand Down Expand Up @@ -278,7 +284,13 @@ let mut web_socket: WebSockets = WebSockets::new(|event: WebsocketEvent| {
});

web_socket.connect(&agg_trade).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
```

### WEBSOCKETS - KLINE
Expand All @@ -303,7 +315,13 @@ let mut web_socket: WebSockets = WebSockets::new(|event: WebsocketEvent| {
});

web_socket.connect(&kline).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
```

## Other Exchanges
Expand Down
20 changes: 0 additions & 20 deletions examples/Cargo.toml

This file was deleted.

6 changes: 3 additions & 3 deletions examples/README.md
Expand Up @@ -2,12 +2,12 @@

## Binance Endpoints

cargo run --release --bin "binance_endpoints"
cargo run --release --example "binance_endpoints"

## Binance Websockets

cargo run --release --bin "binance_websockets"
cargo run --release --example "binance_websockets"

## Binance Websockets - Save all trades to file

cargo run --release --bin "binance_save_all_trades"
cargo run --release --example "binance_save_all_trades"
File renamed without changes.
Expand Up @@ -49,5 +49,11 @@ fn save_all_trades_websocket() {
});

web_socket.connect(&agg_trade).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
}
Expand Up @@ -5,11 +5,12 @@ use binance::userstream::*;
use binance::websockets::*;

fn main() {
user_stream();
user_stream_websocket();
market_websocket();
kline_websocket();
all_trades_websocket();
//user_stream();
//user_stream_websocket();
//market_websocket();
//kline_websocket();
//all_trades_websocket();
last_price();
}

fn user_stream() {
Expand Down Expand Up @@ -62,7 +63,13 @@ fn user_stream_websocket() {
});

web_socket.connect(&listen_key).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
} else {
println!("Not able to start an User Stream (Check your API_KEY)");
}
Expand Down Expand Up @@ -95,7 +102,13 @@ fn market_websocket() {
});

web_socket.connect(&agg_trade).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
}

fn all_trades_websocket() {
Expand All @@ -115,7 +128,13 @@ fn all_trades_websocket() {
});

web_socket.connect(&agg_trade).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
}

fn kline_websocket() {
Expand All @@ -133,5 +152,41 @@ fn kline_websocket() {
});

web_socket.connect(&kline).unwrap(); // check error
web_socket.event_loop();
if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
}

fn last_price() {
let agg_trade: String = format!("!ticker@arr");
let mut btcusdt: f32 = "0".parse().unwrap();

let mut web_socket: WebSockets = WebSockets::new(|event: WebsocketEvent| {
match event {
WebsocketEvent::DayTicker(ticker_events) => {
for tick_event in ticker_events {
if tick_event.symbol == "BTCUSDT" {
btcusdt = tick_event.average_price.parse().unwrap();
let btcusdt_close: f32 = tick_event.current_close.parse().unwrap();
println!("{} - {}", btcusdt, btcusdt_close);
}
}
},
_ => return,
}
});

web_socket.connect(&agg_trade).unwrap(); // check error

if let Err(e) = web_socket.event_loop() {
match e {
err => {
println!("Error: {}", err);
}
}
}
}
7 changes: 2 additions & 5 deletions src/errors.rs
Expand Up @@ -2,12 +2,9 @@ use std;
use reqwest;
use url;
use serde_json;
use tungstenite;

error_chain! {
types {
Error, ErrorKind, ResultExt, Result;
}

errors {
BinanceError(code: i16, msg: String, response: reqwest::Response)
}
Expand All @@ -19,7 +16,7 @@ error_chain! {
ParseFloatError(std::num::ParseFloatError);
UrlParserError(url::ParseError);
Json(serde_json::Error);
Tungstenite(tungstenite::Error);
TimestampError(std::time::SystemTimeError);
}

}
69 changes: 36 additions & 33 deletions src/websockets.rs
Expand Up @@ -3,7 +3,7 @@ use errors::*;
use url::Url;
use serde_json::from_str;

use tungstenite::connect;
use tungstenite::{connect, Message};
use tungstenite::protocol::WebSocket;
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
Expand Down Expand Up @@ -61,40 +61,43 @@ impl<'a> WebSockets<'a> {
}
}

pub fn event_loop(&mut self) {
pub fn event_loop(&mut self) -> Result<()> {
loop {
if let Some(ref mut socket) = self.socket {
let msg: String = socket.0.read_message().unwrap().into_text().unwrap();

if msg.find(OUTBOUND_ACCOUNT_INFO) != None {
let account_update: AccountUpdateEvent = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::AccountUpdate(account_update));
} else if msg.find(EXECUTION_REPORT) != None {
let order_trade: OrderTradeEvent = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::OrderTrade(order_trade));
} else if msg.find(AGGREGATED_TRADE) != None {
let trade: TradesEvent = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::Trade(trade));
} else if msg.find(DAYTICKER) != None {
let trades: Vec<DayTickerEvent> = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::DayTicker(trades));
} else if msg.find(KLINE) != None {
let kline: KlineEvent = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::Kline(kline));
} else if msg.find(PARTIAL_ORDERBOOK) != None {
let partial_orderbook: OrderBook = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::OrderBook(partial_orderbook));
} else if msg.find(DEPTH_ORDERBOOK) != None {
let depth_orderbook: DepthOrderBookEvent = from_str(msg.as_str()).unwrap();

(self.handler)(WebsocketEvent::DepthOrderBook(depth_orderbook));
}
let message = socket.0.read_message()?;

match message {
Message::Text(msg) => {
if msg.find(OUTBOUND_ACCOUNT_INFO) != None {
let account_update: AccountUpdateEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::AccountUpdate(account_update));
} else if msg.find(EXECUTION_REPORT) != None {
let order_trade: OrderTradeEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::OrderTrade(order_trade));
} else if msg.find(AGGREGATED_TRADE) != None {
let trade: TradesEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::Trade(trade));
} else if msg.find(DAYTICKER) != None {
let trades: Vec<DayTickerEvent> = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::DayTicker(trades));
} else if msg.find(KLINE) != None {
let kline: KlineEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::Kline(kline));
} else if msg.find(PARTIAL_ORDERBOOK) != None {
let partial_orderbook: OrderBook = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::OrderBook(partial_orderbook));
} else if msg.find(DEPTH_ORDERBOOK) != None {
let depth_orderbook: DepthOrderBookEvent = from_str(msg.as_str())?;
(self.handler)(WebsocketEvent::DepthOrderBook(depth_orderbook));
}
}
Message::Ping(_) |
Message::Pong(_) |
Message::Binary(_) => {}
Message::Close(e) => {
bail!(format!("Disconnected {:?}", e));
}
}
}
}
}
Expand Down

0 comments on commit fc4ebca

Please sign in to comment.