Skip to content

Commit

Permalink
bug fix for : knowm/XChange#32
Browse files Browse the repository at this point in the history
  • Loading branch information
timmolter committed Jan 21, 2013
1 parent 4c842ce commit 7fff233
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,26 @@
* <p>
* Exchange event that provides convenience constructors for JSON wrapping
* </p>
*
* @since 1.3.0  
*/
public class DefaultExchangeEvent implements ExchangeEvent {

protected final byte[] internalRawData;
protected final ExchangeEventType exchangeEventType;
protected final String data;

/**
* @param exchangeEventType The exchange event type
* @param rawData The raw message content (original reference is kept)
* @param data The raw message content (original reference is kept)
*/
public DefaultExchangeEvent(ExchangeEventType exchangeEventType, byte[] rawData) {
public DefaultExchangeEvent(ExchangeEventType exchangeEventType, String data) {

this.exchangeEventType = exchangeEventType;
internalRawData = new byte[rawData.length];
System.arraycopy(rawData, 0, internalRawData, 0, rawData.length);
this.data = data;
}

@Override
public byte[] getRawData() {
public String getData() {

// Avoid exposing the internal state to consumers
byte[] rawDataClone = new byte[internalRawData.length];
System.arraycopy(internalRawData, 0, rawDataClone, 0, internalRawData.length);

return rawDataClone;
return data;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
public interface ExchangeEvent {

/**
* @return The raw data provided by the upstream server
* @return The data provided by the upstream server
*/
byte[] getRawData();
String getData();

/**
* @return The ExchangeEventType
*/
ExchangeEventType getEventType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
* <ul>
* <li>Classification of event type to allow clients to take appropriate action</li>
* </ul>
*
* @since 1.3.0  
*/
public enum ExchangeEventType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
* <p>
* Exchange event that provides convenience constructors for JSON wrapping
* </p>
*
* @since 1.3.0  
*/
public class JsonWrappedExchangeEvent extends DefaultExchangeEvent {

Expand All @@ -36,7 +34,7 @@ public class JsonWrappedExchangeEvent extends DefaultExchangeEvent {
*/
public JsonWrappedExchangeEvent(ExchangeEventType exchangeEventType, String message) {

super(exchangeEventType, ("{\"message\":\"" + message + "\"}").getBytes());
super(exchangeEventType, ("{\"message\":\"" + message + "\"}"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ public void onConnect() {
@Override
public void onMessage(final String data, IOAcknowledge ack) {

ExchangeEvent exchangeEvent = new DefaultExchangeEvent(ExchangeEventType.MESSAGE, data.getBytes());
ExchangeEvent exchangeEvent = new DefaultExchangeEvent(ExchangeEventType.MESSAGE, data);

addToQueue(exchangeEvent);
}

@Override
public void onJSONMessage(final String jsonString, IOAcknowledge ack) {

ExchangeEvent exchangeEvent = new DefaultExchangeEvent(ExchangeEventType.JSON_MESSAGE, jsonString);

addToQueue(exchangeEvent);
}
Expand Down Expand Up @@ -120,4 +128,5 @@ private void addToQueue(ExchangeEvent exchangeEvent) {
log.warn("InterruptedException occurred while adding ExchangeEvent to Queue!", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() {
log.debug("Received data '{}'", data);

// Create an event
ExchangeEvent marketDataEvent = new DefaultExchangeEvent(ExchangeEventType.MESSAGE, data.getBytes());
ExchangeEvent marketDataEvent = new DefaultExchangeEvent(ExchangeEventType.MESSAGE, data);

queue.put(marketDataEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public interface IOCallback {
*/
void onMessage(String data, IOAcknowledge ack);

/**
* Called when the server sends JSON data.
*
* @param json JSON object sent by server
* @param ack an {@link IOAcknowledge} instance, may be <code>null</code>
*/
void onJSONMessage(String jsonString, IOAcknowledge ack);

/**
* Called when server emits an event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,12 @@ public void transportMessage(String text) {
break;
case IOMessage.TYPE_JSON_MESSAGE:

// TODO perhaps try to catch this exception later instead of parsing the json string twice, just trust that it's really JSON.
try {
// test if JSON is valid by catching a parse Exception
objectMapper.readValue(message.getData(), new TypeReference<Map<String, Object>>() {
});
// JSONUtils.getJsonGenericMap(message.getData(), objectMapper);
findCallback(message).onMessage(message.getData(), remoteAcknowledge(message));
findCallback(message).onJSONMessage(message.getData(), remoteAcknowledge(message));
} catch (JsonParseException e) {
log.warn("Malformated JSON received: " + message.getData());
} catch (Exception e) {
Expand Down Expand Up @@ -950,6 +950,14 @@ public void onMessage(String data, IOAcknowledge ack) {
}
}

@Override
public void onJSONMessage(String jsonString, IOAcknowledge ack) {

for (SocketIO socket : sockets.values()) {
socket.getCallback().onJSONMessage(jsonString, ack);
}
}

@Override
public void on(String event, IOAcknowledge ack, Object... args) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ public void testOnMessage() throws Exception {

}

// @Test
// public void testOnJsonMessage() throws Exception {
//
// JSONObject jsonObject = new JSONObject();
//
// testObject.onMessage(jsonObject, null);
//
// ExchangeEvent event = queue.take();
//
// assertEquals(event.getEventType(), ExchangeEventType.JSON_MESSAGE);
//
// }

@Test
public void testOn() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ public void onMessage(String data, IOAcknowledge ack) {
ta.setCaretPosition(ta.getDocument().getLength());
}

// @Override
// public void onJSONMessage(JSONObject json, IOAcknowledge ack) {
//
// try {
// JSONObject ticker = (JSONObject) json.get("ticker");
// if (ticker != null) {
// JSONObject last = (JSONObject) ticker.get("last");
// if (last != null) {
// String display = (String) last.get("display");
// ta.append(display.toString() + "\n");
// ta.setCaretPosition(ta.getDocument().getLength());
// }
// }
// } catch (JSONException e) {
// // Ignore (probably an "op")
// }
// }

@Override
public void onJSONMessage(String jsonString, IOAcknowledge ack) {

// TODO Auto-generated method stub

}

@Override
public void on(String event, IOAcknowledge ack, Object... args) {

Expand All @@ -212,4 +237,5 @@ public void onError(SocketIOException socketIOException) {

System.out.println("Error: " + socketIOException.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@

import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.xeiam.xchange.Currencies;
import com.xeiam.xchange.Exchange;
import com.xeiam.xchange.ExchangeFactory;
Expand All @@ -38,8 +35,6 @@
*/
public class StreamingTickerDemo {

private static final Logger log = LoggerFactory.getLogger(StreamingTickerDemo.class);

public static void main(String[] args) {

StreamingTickerDemo tickerDemo = new StreamingTickerDemo();
Expand All @@ -60,26 +55,22 @@ public void start() {
// Get blocking queue that receives exchange event data
BlockingQueue<ExchangeEvent> eventQueue = streamingMarketDataService.getEventQueue();

// Take streaming ticker data from the queue and do something with it for the first few ticks
int count = 0;
try {

while (true) {

// Exhaust exchange events first
while (!eventQueue.isEmpty()) {
ExchangeEvent exchangeEvent = eventQueue.take();
log.info("Exchange event: {} {}", exchangeEvent.getEventType().name(), new String(exchangeEvent.getRawData()));
System.out.println("Exchange event: " + exchangeEvent.getEventType().name() + ", " + exchangeEvent.getData());
}

// Check for Tickers
if (!tickerQueue.isEmpty()) {
doSomething(tickerQueue.take());
count++;
}
}

// log.info("Disconnecting (event queue threads will be suspended)...");
// streamingMarketDataService.disconnect();

} catch (InterruptedException e) {
e.printStackTrace();
}
Expand All @@ -92,7 +83,7 @@ public void start() {
*/
private void doSomething(Ticker ticker) {

log.info(ticker.toString());
System.out.println(ticker.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ public void handleEvent(ExchangeEvent exchangeEvent) {
addToEventQueue(exchangeEvent);
break;
case MESSAGE:
log.debug("Generic message. Length=" + exchangeEvent.getRawData().length);
log.debug("Generic message. Length=" + exchangeEvent.getData().length());
addToEventQueue(exchangeEvent);
break;
case JSON_MESSAGE:
log.debug("JSON message. Length=" + exchangeEvent.getRawData().length);
log.debug("JSON message. Length=" + exchangeEvent.getData().length());

// Get raw JSON
Map<String, Object> rawJSON = JSONUtils.getJsonGenericMap(new String(exchangeEvent.getRawData()), tickerObjectMapper);
Map<String, Object> rawJSON = JSONUtils.getJsonGenericMap(new String(exchangeEvent.getData()), tickerObjectMapper);

// Determine what has been sent
if (rawJSON.containsKey("ticker")) {
Expand All @@ -104,7 +104,7 @@ public void handleEvent(ExchangeEvent exchangeEvent) {
}
break;
case ERROR:
log.error("Error message. Length=" + exchangeEvent.getRawData().length);
log.error("Error message. Length=" + exchangeEvent.getData().length());
addToEventQueue(exchangeEvent);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public BlockingQueue<Ticker> getTickerQueue(String tradableIdentifier, final Str
private void connectNow(String currency, RunnableExchangeEventListener listener) {

String url = apiBase + "?Channel=ticker&Currency=" + currency;
log.debug(url);
log.debug("streaming url= " + url);

connect(url, listener);
}
Expand Down

0 comments on commit 7fff233

Please sign in to comment.