Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Make use of onCompletion callback on DeferredResult

  • Loading branch information...
commit 65169c979c5fd24c173704f5f04f3b6739229b7b 1 parent 8f0a9fa
@rstoyanchev rstoyanchev authored
View
81 stocks/src/main/java/org/springframework/amqp/rabbit/stocks/web/QuoteController.java
@@ -55,10 +55,10 @@
private Queue<Quote> quotes = new PriorityBlockingQueue<Quote>(100, new QuoteComparator());
- private Map<String, DeferredResult<TradeResponse>> suspendedTradeRequests =
+ private Map<String, DeferredResult<TradeResponse>> tradeRequests =
new ConcurrentHashMap<String, DeferredResult<TradeResponse>>();
- private Map<DeferredResult<List<Quote>>, Long> suspendedQuoteRequests =
+ private Map<DeferredResult<List<Quote>>, Long> quoteRequests =
new ConcurrentHashMap<DeferredResult<List<Quote>>, Long>();
private long timeout = 30000; // 30 seconds of data
@@ -80,8 +80,8 @@ public void handleTrade(TradeResponse response) {
if (tradeResponse.getTimestamp() < timestamp) {
responses.remove(requestId);
}
- if (suspendedTradeRequests.containsKey(requestId)) {
- DeferredResult<TradeResponse> deferredResult = suspendedTradeRequests.remove(requestId);
+ if (tradeRequests.containsKey(requestId)) {
+ DeferredResult<TradeResponse> deferredResult = tradeRequests.remove(requestId);
deferredResult.setResult(tradeResponse);
}
}
@@ -89,18 +89,15 @@ public void handleTrade(TradeResponse response) {
public void handleQuote(Quote message) {
logger.info("Client received: " + message);
- quotes.add(message);
-
- for (Entry<DeferredResult<List<Quote>>, Long> entry : suspendedQuoteRequests.entrySet()) {
- List<Quote> list = getLatestQuotes(entry.getValue());
- if (!list.isEmpty()) {
- DeferredResult<List<Quote>> deferredResult = entry.getKey();
- deferredResult.setResult(list);
- suspendedQuoteRequests.remove(entry.getKey());
- }
+ this.quotes.add(message);
+
+ for (Entry<DeferredResult<List<Quote>>, Long> entry : this.quoteRequests.entrySet()) {
+ List<Quote> newQuotes = getLatestQuotes(entry.getValue());
+ entry.getKey().setResult(newQuotes);
+
}
- long timestamp = System.currentTimeMillis() - timeout;
+ long timestamp = System.currentTimeMillis() - this.timeout;
for (Iterator<Quote> iterator = quotes.iterator(); iterator.hasNext();) {
Quote quote = iterator.next();
if (quote.getTimestamp() < timestamp) {
@@ -111,16 +108,23 @@ public void handleQuote(Quote message) {
@RequestMapping("/quotes")
@ResponseBody
- public Object quotes(@RequestParam(required = false) Long timestamp) {
+ public DeferredResult<List<Quote>> quotes(@RequestParam(required = false) Long timestamp) {
+
+ final DeferredResult<List<Quote>> result = new DeferredResult<List<Quote>>(null, Collections.emptyList());
+ this.quoteRequests.put(result, timestamp);
+
+ result.onCompletion(new Runnable() {
+ public void run() {
+ quoteRequests.remove(result);
+ }
+ });
+
List<Quote> list = getLatestQuotes(timestamp);
- if (list.isEmpty()) {
- DeferredResult<List<Quote>> deferredResult = new DeferredResult<List<Quote>>(null, Collections.emptyList());
- suspendedQuoteRequests.put(deferredResult, timestamp);
- return deferredResult;
- }
- else {
- return list;
+ if (!list.isEmpty()) {
+ result.setResult(list);
}
+
+ return result;
}
private List<Quote> getLatestQuotes(Long timestamp) {
@@ -128,7 +132,7 @@ public Object quotes(@RequestParam(required = false) Long timestamp) {
timestamp = 0L;
}
ArrayList<Quote> list = new ArrayList<Quote>();
- for (Quote quote : quotes) {
+ for (Quote quote : this.quotes) {
if (quote.getTimestamp() > timestamp) {
list.add(quote);
}
@@ -139,27 +143,28 @@ public Object quotes(@RequestParam(required = false) Long timestamp) {
@RequestMapping(value = "/trade", method = RequestMethod.POST)
@ResponseBody
- public Object trade(@ModelAttribute TradeRequest tradeRequest) {
+ public DeferredResult<TradeResponse> trade(@ModelAttribute TradeRequest tradeRequest) {
+
String ticker = tradeRequest.getTicker();
Long quantity = tradeRequest.getQuantity();
if (quantity == null || quantity <= 0 || !StringUtils.hasText(ticker)) {
// error
return null;
- } else {
- DeferredResult<TradeResponse> deferredResult = new DeferredResult<TradeResponse>();
- suspendedTradeRequests.put(tradeRequest.getId(), deferredResult);
-
- // Fake rest of request while UI is basic
- tradeRequest.setAccountName("ACCT-123");
- tradeRequest.setBuyRequest(true);
- tradeRequest.setOrderType("MARKET");
- tradeRequest.setRequestId("REQ-1");
- tradeRequest.setUserName("Joe Trader");
- tradeRequest.setUserName("Joe");
- stockServiceGateway.send(tradeRequest);
-
- return deferredResult;
}
+
+ DeferredResult<TradeResponse> result = new DeferredResult<TradeResponse>();
+ this.tradeRequests.put(tradeRequest.getId(), result);
+
+ // Fake rest of request while UI is basic
+ tradeRequest.setAccountName("ACCT-123");
+ tradeRequest.setBuyRequest(true);
+ tradeRequest.setOrderType("MARKET");
+ tradeRequest.setRequestId("REQ-1");
+ tradeRequest.setUserName("Joe Trader");
+ tradeRequest.setUserName("Joe");
+ this.stockServiceGateway.send(tradeRequest);
+
+ return result;
}
private static class QuoteComparator implements Comparator<Quote> {
Please sign in to comment.
Something went wrong with that request. Please try again.