Skip to content

Commit

Permalink
[hitbtc] Add raw marketdata websocket service
Browse files Browse the repository at this point in the history
  • Loading branch information
baffo32 committed Nov 16, 2015
1 parent 54378c5 commit 00857bc
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 8 deletions.
Expand Up @@ -52,12 +52,18 @@ public void applySpecification(ExchangeSpecification exchangeSpecification) {
if (exchangeSpecification.getSslUri() == null) {
exchangeSpecification.setSslUri(defaultSpecification.getSslUri());
}
if (exchangeSpecification.getSslUriStreaming() == null) {
exchangeSpecification.setSslUriStreaming(defaultSpecification.getSslUriStreaming());
}
if (exchangeSpecification.getHost() == null) {
exchangeSpecification.setHost(defaultSpecification.getHost());
}
if (exchangeSpecification.getPlainTextUri() == null) {
exchangeSpecification.setPlainTextUri(defaultSpecification.getPlainTextUri());
}
if (exchangeSpecification.getPlainTextUriStreaming() == null) {
exchangeSpecification.setPlainTextUriStreaming(defaultSpecification.getPlainTextUriStreaming());
}
if (exchangeSpecification.getExchangeSpecificParameters() == null) {
exchangeSpecification.setExchangeSpecificParameters(defaultSpecification.getExchangeSpecificParameters());
} else {
Expand Down
Expand Up @@ -71,7 +71,7 @@ public enum ExchangeEventType {
SUBSCRIBE_DEPTH,

/**
* A message with a Market Depth update payload
* A message with a market depth OrderBook payload
*/
DEPTH,

Expand Down
Expand Up @@ -10,13 +10,19 @@
import com.xeiam.xchange.dto.marketdata.Ticker;
import com.xeiam.xchange.dto.marketdata.Trades;
import com.xeiam.xchange.examples.hitbtc.HitbtcExampleUtils;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcIncrementalRefresh;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcOrderBook;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcSnapshotFullRefresh;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcSymbols;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTicker;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTime;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTrades;
import com.xeiam.xchange.hitbtc.service.polling.HitbtcMarketDataServiceRaw;
import com.xeiam.xchange.hitbtc.service.streaming.HitbtcStreamingMarketDataConfiguration;
import com.xeiam.xchange.hitbtc.service.streaming.HitbtcStreamingMarketDataServiceRaw;
import com.xeiam.xchange.service.polling.marketdata.PollingMarketDataService;
import com.xeiam.xchange.service.streaming.ExchangeEvent;
import com.xeiam.xchange.service.streaming.StreamingExchangeService;

public class HitbtcMarketDataDemo {

Expand All @@ -31,6 +37,8 @@ public static void main(String[] args) throws Exception {

generic(marketDataService);
raw((HitbtcMarketDataServiceRaw) marketDataService);

rawStreaming(new HitbtcStreamingMarketDataServiceRaw(hitbtcExchange, new HitbtcStreamingMarketDataConfiguration()));
}

private static void generic(PollingMarketDataService marketDataService) throws IOException {
Expand Down Expand Up @@ -97,4 +105,35 @@ private static void raw(HitbtcMarketDataServiceRaw marketDataService) throws IOE
System.out.println("Trades, last minute, Size= " + trades.getHitbtcTrades().size());
System.out.println(trades.toString());
}

private static void rawStreaming(StreamingExchangeService marketDataService) throws Exception {

marketDataService.connect();

boolean gotFullRefresh = false;
while (!gotFullRefresh) {

ExchangeEvent exchangeEvent = marketDataService.getNextEvent();
Object payload = exchangeEvent.getPayload();

if (payload == null) {

System.out.println("Websocket: " + exchangeEvent.getData());

} else if (payload.getClass() == HitbtcIncrementalRefresh.class) {

System.out.println("Websocket: " + payload.toString());

} else { // payload.getClass() == HitbtcSnapshotFullRefresh.class

HitbtcSnapshotFullRefresh snapshot = (HitbtcSnapshotFullRefresh)payload;
gotFullRefresh = true;

System.out.println("Websocket: " + snapshot.getSymbol() + " snapshot, seqNo=" + snapshot.getSnapshotSeqNo() + ", exchangeStatus=" + snapshot.getExchangeStatus());

}
}

marketDataService.disconnect();
}
}
Expand Up @@ -13,8 +13,11 @@
import com.xeiam.xchange.dto.Order;
import com.xeiam.xchange.dto.Order.OrderType;
import com.xeiam.xchange.dto.account.AccountInfo;
import com.xeiam.xchange.dto.marketdata.*;
import com.xeiam.xchange.dto.marketdata.Trades.TradeSortType;
import com.xeiam.xchange.dto.marketdata.OrderBook;
import com.xeiam.xchange.dto.marketdata.OrderBookUpdate;
import com.xeiam.xchange.dto.marketdata.Ticker;
import com.xeiam.xchange.dto.marketdata.Trade;
import com.xeiam.xchange.dto.marketdata.Trades;
import com.xeiam.xchange.dto.meta.ExchangeMetaData;
import com.xeiam.xchange.dto.meta.MarketMetaData;
import com.xeiam.xchange.dto.trade.LimitOrder;
Expand All @@ -23,7 +26,17 @@
import com.xeiam.xchange.dto.trade.UserTrades;
import com.xeiam.xchange.dto.trade.Wallet;
import com.xeiam.xchange.hitbtc.dto.account.HitbtcBalance;
import com.xeiam.xchange.hitbtc.dto.marketdata.*;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcIncrementalRefresh;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcOrderBook;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcSnapshotFullRefresh;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcStreamingOrder;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcStreamingTrade;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcSymbol;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcSymbols;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTime;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTicker;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTrade;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcTrades;
import com.xeiam.xchange.hitbtc.dto.meta.HitbtcMetaData;
import com.xeiam.xchange.hitbtc.dto.trade.HitbtcOrder;
import com.xeiam.xchange.hitbtc.dto.trade.HitbtcOwnTrade;
Expand Down Expand Up @@ -186,25 +199,32 @@ public static OrderBook adaptSnapshotFullRefresh(HitbtcSnapshotFullRefresh hitbt
}

public static List<OrderBookUpdate> adaptIncrementalRefreshOrders(HitbtcIncrementalRefresh hitbtcIncrementalRefresh) {
return adaptIncrementalRefreshOrders(hitbtcIncrementalRefresh, null, null);
}

public static List<OrderBookUpdate> adaptIncrementalRefreshOrders(HitbtcIncrementalRefresh hitbtcIncrementalRefresh, BigDecimal volume, Date timestamp) {

CurrencyPair currencyPair = adaptSymbol(hitbtcIncrementalRefresh.getSymbol());
List<HitbtcStreamingOrder> asks = hitbtcIncrementalRefresh.getAsk();
List<HitbtcStreamingOrder> bids = hitbtcIncrementalRefresh.getBid();

List<OrderBookUpdate> updates = new ArrayList<OrderBookUpdate>(asks.size() + bids.size());

if (updates.size() != 1)
volume = null;

for (int i = 0; i < asks.size(); i++) {
HitbtcStreamingOrder order = asks.get(i);

OrderBookUpdate update = new OrderBookUpdate(OrderType.ASK, null, currencyPair, order.getPrice(), null, order.getSize());
OrderBookUpdate update = new OrderBookUpdate(OrderType.ASK, volume, currencyPair, order.getPrice(), timestamp, order.getSize());

updates.add(update);
}

for (int i = 0; i < bids.size(); i++) {
HitbtcStreamingOrder order = bids.get(i);

OrderBookUpdate update = new OrderBookUpdate(OrderType.BID, null, currencyPair, order.getPrice(), null, order.getSize());
OrderBookUpdate update = new OrderBookUpdate(OrderType.BID, volume, currencyPair, order.getPrice(), timestamp, order.getSize());

updates.add(update);
}
Expand Down Expand Up @@ -263,7 +283,7 @@ public static UserTrades adaptTradeHistory(HitbtcOwnTrade[] tradeHistoryRaw, Exc
trades.add(trade);
}

return new UserTrades(trades, TradeSortType.SortByTimestamp);
return new UserTrades(trades, Trades.TradeSortType.SortByTimestamp);
}

public static AccountInfo adaptAccountInfo(HitbtcBalance[] accountInfoRaw) {
Expand Down
Expand Up @@ -48,6 +48,7 @@ public ExchangeSpecification getDefaultExchangeSpecification() {
exchangeSpecification.setSslUri("https://api.hitbtc.com");
exchangeSpecification.setHost("hitbtc.com");
exchangeSpecification.setPort(80);
exchangeSpecification.setPlainTextUriStreaming("ws://api.hitbtc.com/");
exchangeSpecification.setExchangeName("Hitbtc");
exchangeSpecification.setExchangeDescription("Hitbtc is a Bitcoin exchange.");
exchangeSpecification.setExchangeSpecificParametersItem("demo-api", "http://demo-api.hitbtc.com");
Expand Down
@@ -0,0 +1,61 @@
package com.xeiam.xchange.hitbtc.service.streaming;

import com.xeiam.xchange.service.streaming.ExchangeStreamingConfiguration;

public class HitbtcStreamingMarketDataConfiguration implements ExchangeStreamingConfiguration {

private final int maxReconnectAttempts;
private final int reconnectWaitTimeInMs;
private final int timeoutInMs;

/**
* Constructor
*
* @param maxReconnectAttempts
* @param reconnectWaitTimeInMs
* @param timeoutInMs
*/
public HitbtcStreamingMarketDataConfiguration(int maxReconnectAttempts, int reconnectWaitTimeInMs, int timeoutInMs) {

this.maxReconnectAttempts = maxReconnectAttempts;
this.reconnectWaitTimeInMs = reconnectWaitTimeInMs;
this.timeoutInMs = timeoutInMs;
}

public HitbtcStreamingMarketDataConfiguration() {

maxReconnectAttempts = 30; // 67 min
reconnectWaitTimeInMs = 135000; // 2:15
timeoutInMs = 120000; // 2:00
}

@Override
public int getMaxReconnectAttempts() {

return maxReconnectAttempts;
}

@Override
public int getReconnectWaitTimeInMs() {

return reconnectWaitTimeInMs;
}

@Override
public int getTimeoutInMs() {

return timeoutInMs;
}

@Override
public boolean isEncryptedChannel() {

return false;
}

@Override
public boolean keepAlive() {
return true;
}
}

@@ -0,0 +1,86 @@
package com.xeiam.xchange.hitbtc.service.streaming;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xeiam.xchange.exceptions.ExchangeException;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcIncrementalRefresh;
import com.xeiam.xchange.hitbtc.dto.marketdata.HitbtcSnapshotFullRefresh;
import com.xeiam.xchange.service.streaming.DefaultExchangeEvent;
import com.xeiam.xchange.service.streaming.ExchangeEvent;
import com.xeiam.xchange.service.streaming.ExchangeEventListener;
import com.xeiam.xchange.service.streaming.ExchangeEventType;

public class HitbtcStreamingMarketDataRawEventListener extends ExchangeEventListener {

private final BlockingQueue<ExchangeEvent> consumerEventQueue;
private final ObjectMapper streamObjectMapper;

/**
* @param consumerEventQueue
*/
public HitbtcStreamingMarketDataRawEventListener(BlockingQueue<ExchangeEvent> consumerEventQueue) {

this.consumerEventQueue = consumerEventQueue;

this.streamObjectMapper = new ObjectMapper();
this.streamObjectMapper.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, true);
}

@Override
public void handleEvent(ExchangeEvent event) throws ExchangeException {

if (event.getEventType() != ExchangeEventType.MESSAGE) {

forwardEvent(event);

} else {

String data = event.getData();

HitbtcIncrementalRefresh incrementalRefresh;
try {

incrementalRefresh = streamObjectMapper.readValue(data, HitbtcIncrementalRefresh.class);

} catch (JsonMappingException jme) {

HitbtcSnapshotFullRefresh snapshotFullRefresh;
try {

snapshotFullRefresh = streamObjectMapper.readValue(data, HitbtcSnapshotFullRefresh.class);

} catch (IOException e) {
throw new ExchangeException("JSON parse error", e);
}

handleDTO(data, snapshotFullRefresh);
return;

} catch (IOException e) {
throw new ExchangeException("JSON parse error", e);
}

handleDTO(data, incrementalRefresh);
}
}

private void forwardEvent(ExchangeEvent event) {

try {
consumerEventQueue.put(event);
} catch (InterruptedException e) {
throw new ExchangeException("InterruptedException!", e);
}
}

private void handleDTO(String data, Object dto) {

ExchangeEvent event = new DefaultExchangeEvent(ExchangeEventType.EVENT, data, dto);

forwardEvent(event);
}
}
@@ -0,0 +1,32 @@
package com.xeiam.xchange.hitbtc.service.streaming;

import java.net.URI;

import com.xeiam.xchange.Exchange;
import com.xeiam.xchange.service.streaming.BaseWebSocketExchangeService;

public class HitbtcStreamingMarketDataServiceRaw extends BaseWebSocketExchangeService {

private final HitbtcStreamingMarketDataRawEventListener exchangeEventListener;

/**
* Constructor
*
* @param exchange
*/
public HitbtcStreamingMarketDataServiceRaw(Exchange exchange, HitbtcStreamingMarketDataConfiguration configuration) {

super(exchange, configuration);

this.exchangeEventListener = new HitbtcStreamingMarketDataRawEventListener(consumerEventQueue);
}

@Override
public void connect() throws Exception {

URI uri = URI.create(exchange.getExchangeSpecification().getPlainTextUriStreaming());

// Use the default internal connect
internalConnect(uri, exchangeEventListener, null);
}
}
Expand Up @@ -7,8 +7,8 @@

import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

public class HitbtcIncrementalRefreshJsonTest {

Expand Down

0 comments on commit 00857bc

Please sign in to comment.