From 329c2ec7d282d23fdff80f69584bd8e75dba7ac1 Mon Sep 17 00:00:00 2001 From: Patrick Ruhkopf Date: Tue, 22 Sep 2015 11:44:15 -0400 Subject: [PATCH] Added example demonstrating ObservableCollapser This was based on the discussion in https://github.com/Netflix/Hystrix/issues/351. I've modified the original example to be compatible with JDK 6, removed the dependency to ICU and refactored the test class to be part of the command class to match the pattern used for other examples. --- .../ObservableCollapserGetWordForNumber.java | 262 ++++++++++++++++++ .../ObservableCommandNumbersToWords.java | 81 ++++++ 2 files changed, 343 insertions(+) create mode 100644 hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCollapserGetWordForNumber.java create mode 100644 hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCommandNumbersToWords.java diff --git a/hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCollapserGetWordForNumber.java b/hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCollapserGetWordForNumber.java new file mode 100644 index 000000000..2d05bb627 --- /dev/null +++ b/hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCollapserGetWordForNumber.java @@ -0,0 +1,262 @@ +package com.netflix.hystrix.examples.basic; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import rx.Observable; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +import com.netflix.hystrix.HystrixCollapser.CollapsedRequest; +import com.netflix.hystrix.HystrixObservableCollapser; +import com.netflix.hystrix.HystrixObservableCommand; +import com.netflix.hystrix.HystrixRequestLog; +import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord; +import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; + +/** + * Example that uses {@link HystrixObservableCollapser} to batch multiple {@link ObservableCommandNumbersToWords} requests. + * + * @author Patrick Ruhkopf + */ +public class ObservableCollapserGetWordForNumber extends HystrixObservableCollapser +{ + private final Integer number; + + private final static AtomicInteger counter = new AtomicInteger(); + + public static void resetCmdCounter() + { + counter.set(0); + } + + public static int getCmdCount() + { + return counter.get(); + } + + public ObservableCollapserGetWordForNumber(final Integer number) + { + this.number = number; + } + + @Override + public Integer getRequestArgument() + { + return number; + } + + @SuppressWarnings("boxing") + @Override + protected HystrixObservableCommand createCommand(final Collection> requests) + { + final int count = counter.incrementAndGet(); + System.out.println("Creating batch for " + requests.size() + " requests. Total invocations so far: " + count); + + final List numbers = new ArrayList(); + for (final CollapsedRequest request : requests) + { + numbers.add(request.getArgument()); + } + + return new ObservableCommandNumbersToWords(numbers); + } + + @Override + protected Func1 getBatchReturnTypeKeySelector() + { + return new Func1() + { + @Override + public Integer call(final NumberWord nw) + { + return nw.getNumber(); + } + }; + } + + @Override + protected Func1 getRequestArgumentKeySelector() + { + return new Func1() + { + @Override + public Integer call(final Integer no) + { + return no; + } + + }; + } + + @Override + protected Func1 getBatchReturnTypeToResponseTypeMapper() + { + return new Func1() + { + @Override + public String call(final NumberWord nw) + { + return nw.getWord(); + } + }; + } + + @Override + protected void onMissingResponse(final CollapsedRequest request) + { + request.setException(new Exception("No word")); + } + + public static class ObservableCollapserGetWordForNumberTest + { + private HystrixRequestContext ctx; + + @Before + public void before() + { + ctx = HystrixRequestContext.initializeContext(); + ObservableCollapserGetWordForNumber.resetCmdCounter(); + } + + @After + public void after() + { + System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); + ctx.shutdown(); + } + + /** + * Example where we subscribe without using a specific scheduler. That means we run the actions on the same thread. + */ + @Test + public void shouldCollapseRequestsSync() + { + final int noOfRequests = 10; + final Map> subscribersByNumber = new HashMap>( + noOfRequests); + + TestSubscriber subscriber; + for (int number = 0; number < noOfRequests; number++) + { + subscriber = new TestSubscriber(); + new ObservableCollapserGetWordForNumber(number).toObservable().subscribe(subscriber); + subscribersByNumber.put(number, subscriber); + + // wait a little bit after running half of the requests so that we don't collapse all of them into one batch + // TODO this can probably be improved by using a test scheduler + if (number == noOfRequests / 2) + sleep(1000); + + } + + assertThat(subscribersByNumber.size(), is(noOfRequests)); + for (final Entry> subscriberByNumber : subscribersByNumber.entrySet()) + { + subscriber = subscriberByNumber.getValue(); + subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS); + + assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0)); + assertThat(subscriber.getOnNextEvents().size(), is(1)); + + final String word = subscriber.getOnNextEvents().get(0); + System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word); + assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey()))); + } + + assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1); + assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests); + } + + /** + * Example where we subscribe on the computation scheduler. For this we need the {@link HystrixContextScheduler}, that + * passes the {@link HystrixRequestContext} to the thread that runs the action. + */ + @Test + public void shouldCollapseRequestsAsync() + { + final HystrixContextScheduler contextAwareScheduler = new HystrixContextScheduler(Schedulers.computation()); + + final int noOfRequests = 10; + final Map> subscribersByNumber = new HashMap>( + noOfRequests); + + TestSubscriber subscriber; + for (int number = 0; number < noOfRequests; number++) + { + subscriber = new TestSubscriber(); + final int finalNumber = number; + + // defer and subscribe on specific scheduler + Observable.defer(new Func0>() + { + @Override + public Observable call() + { + return new ObservableCollapserGetWordForNumber(finalNumber).toObservable(); + } + }).subscribeOn(contextAwareScheduler).subscribe(subscriber); + + subscribersByNumber.put(number, subscriber); + + // wait a little bit after running half of the requests so that we don't collapse all of them into one batch + // TODO this can probably be improved by using a test scheduler + if (number == noOfRequests / 2) + sleep(1000); + } + + assertThat(subscribersByNumber.size(), is(noOfRequests)); + for (final Entry> subscriberByNumber : subscribersByNumber.entrySet()) + { + subscriber = subscriberByNumber.getValue(); + subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS); + + assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0)); + assertThat(subscriber.getOnNextEvents().size(), is(1)); + + final String word = subscriber.getOnNextEvents().get(0); + System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word); + assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey()))); + } + + assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1); + assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests); + } + + private String numberToWord(final int number) + { + return ObservableCommandNumbersToWords.dict.get(number); + } + + private void sleep(final long ms) + { + try + { + Thread.sleep(1000); + } + catch (final InterruptedException e) + { + throw new IllegalStateException(e); + } + } + + } +} diff --git a/hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCommandNumbersToWords.java b/hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCommandNumbersToWords.java new file mode 100644 index 000000000..527476976 --- /dev/null +++ b/hystrix-examples/src/main/java/com/netflix/hystrix/examples/basic/ObservableCommandNumbersToWords.java @@ -0,0 +1,81 @@ +package com.netflix.hystrix.examples.basic; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import rx.Observable; +import rx.functions.Func1; + +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixObservableCommand; +import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord; + +/** + * A simple Hystrix Observable command that translates a number (Integer) into an English text. + */ +class ObservableCommandNumbersToWords extends HystrixObservableCommand +{ + private final List numbers; + + // in the real world you'd probably want to replace this very simple code by using ICU or similar + static Map dict = new HashMap(11); + static + { + dict.put(0, "zero"); + dict.put(1, "one"); + dict.put(2, "two"); + dict.put(3, "three"); + dict.put(4, "four"); + dict.put(5, "five"); + dict.put(6, "six"); + dict.put(7, "seven"); + dict.put(8, "eight"); + dict.put(9, "nine"); + dict.put(10, "ten"); + } + + public ObservableCommandNumbersToWords(final List numbers) + { + super(HystrixCommandGroupKey.Factory.asKey(ObservableCommandNumbersToWords.class.getName())); + this.numbers = numbers; + } + + @Override + protected Observable construct() + { + return Observable.from(numbers).map(new Func1() + { + @Override + public NumberWord call(final Integer number) + { + return new NumberWord(number, dict.get(number)); + } + + }); + } + + static class NumberWord + { + private final Integer number; + private final String word; + + public NumberWord(final Integer number, final String word) + { + super(); + this.number = number; + this.word = word; + } + + public Integer getNumber() + { + return number; + } + + public String getWord() + { + return word; + } + } + +} \ No newline at end of file