Skip to content

Commit

Permalink
Added example demonstrating ObservableCollapser
Browse files Browse the repository at this point in the history
This was based on the discussion in Netflix#351. I've modified the original example to be compatible with JDK 6, removed the dependency to ICU and refactored the test class to be part of the command class to match the pattern used for other examples.
  • Loading branch information
restfulhead committed Sep 22, 2015
1 parent 881a4c9 commit 329c2ec
Show file tree
Hide file tree
Showing 2 changed files with 343 additions and 0 deletions.
@@ -0,0 +1,262 @@
package com.netflix.hystrix.examples.basic;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixObservableCollapser;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.HystrixRequestLog;
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

/**
* Example that uses {@link HystrixObservableCollapser} to batch multiple {@link ObservableCommandNumbersToWords} requests.
*
* @author Patrick Ruhkopf
*/
public class ObservableCollapserGetWordForNumber extends HystrixObservableCollapser<Integer, NumberWord, String, Integer>
{
private final Integer number;

private final static AtomicInteger counter = new AtomicInteger();

public static void resetCmdCounter()
{
counter.set(0);
}

public static int getCmdCount()
{
return counter.get();
}

public ObservableCollapserGetWordForNumber(final Integer number)
{
this.number = number;
}

@Override
public Integer getRequestArgument()
{
return number;
}

@SuppressWarnings("boxing")
@Override
protected HystrixObservableCommand<NumberWord> createCommand(final Collection<CollapsedRequest<String, Integer>> requests)
{
final int count = counter.incrementAndGet();
System.out.println("Creating batch for " + requests.size() + " requests. Total invocations so far: " + count);

final List<Integer> numbers = new ArrayList<Integer>();
for (final CollapsedRequest<String, Integer> request : requests)
{
numbers.add(request.getArgument());
}

return new ObservableCommandNumbersToWords(numbers);
}

@Override
protected Func1<NumberWord, Integer> getBatchReturnTypeKeySelector()
{
return new Func1<NumberWord, Integer>()
{
@Override
public Integer call(final NumberWord nw)
{
return nw.getNumber();
}
};
}

@Override
protected Func1<Integer, Integer> getRequestArgumentKeySelector()
{
return new Func1<Integer, Integer>()
{
@Override
public Integer call(final Integer no)
{
return no;
}

};
}

@Override
protected Func1<NumberWord, String> getBatchReturnTypeToResponseTypeMapper()
{
return new Func1<NumberWord, String>()
{
@Override
public String call(final NumberWord nw)
{
return nw.getWord();
}
};
}

@Override
protected void onMissingResponse(final CollapsedRequest<String, Integer> request)
{
request.setException(new Exception("No word"));
}

public static class ObservableCollapserGetWordForNumberTest
{
private HystrixRequestContext ctx;

@Before
public void before()
{
ctx = HystrixRequestContext.initializeContext();
ObservableCollapserGetWordForNumber.resetCmdCounter();
}

@After
public void after()
{
System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
ctx.shutdown();
}

/**
* Example where we subscribe without using a specific scheduler. That means we run the actions on the same thread.
*/
@Test
public void shouldCollapseRequestsSync()
{
final int noOfRequests = 10;
final Map<Integer, TestSubscriber<String>> subscribersByNumber = new HashMap<Integer, TestSubscriber<String>>(
noOfRequests);

TestSubscriber<String> subscriber;
for (int number = 0; number < noOfRequests; number++)
{
subscriber = new TestSubscriber<String>();
new ObservableCollapserGetWordForNumber(number).toObservable().subscribe(subscriber);
subscribersByNumber.put(number, subscriber);

// wait a little bit after running half of the requests so that we don't collapse all of them into one batch
// TODO this can probably be improved by using a test scheduler
if (number == noOfRequests / 2)
sleep(1000);

}

assertThat(subscribersByNumber.size(), is(noOfRequests));
for (final Entry<Integer, TestSubscriber<String>> subscriberByNumber : subscribersByNumber.entrySet())
{
subscriber = subscriberByNumber.getValue();
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);

assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0));
assertThat(subscriber.getOnNextEvents().size(), is(1));

final String word = subscriber.getOnNextEvents().get(0);
System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word);
assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey())));
}

assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1);
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests);
}

/**
* Example where we subscribe on the computation scheduler. For this we need the {@link HystrixContextScheduler}, that
* passes the {@link HystrixRequestContext} to the thread that runs the action.
*/
@Test
public void shouldCollapseRequestsAsync()
{
final HystrixContextScheduler contextAwareScheduler = new HystrixContextScheduler(Schedulers.computation());

final int noOfRequests = 10;
final Map<Integer, TestSubscriber<String>> subscribersByNumber = new HashMap<Integer, TestSubscriber<String>>(
noOfRequests);

TestSubscriber<String> subscriber;
for (int number = 0; number < noOfRequests; number++)
{
subscriber = new TestSubscriber<String>();
final int finalNumber = number;

// defer and subscribe on specific scheduler
Observable.defer(new Func0<Observable<String>>()
{
@Override
public Observable<String> call()
{
return new ObservableCollapserGetWordForNumber(finalNumber).toObservable();
}
}).subscribeOn(contextAwareScheduler).subscribe(subscriber);

subscribersByNumber.put(number, subscriber);

// wait a little bit after running half of the requests so that we don't collapse all of them into one batch
// TODO this can probably be improved by using a test scheduler
if (number == noOfRequests / 2)
sleep(1000);
}

assertThat(subscribersByNumber.size(), is(noOfRequests));
for (final Entry<Integer, TestSubscriber<String>> subscriberByNumber : subscribersByNumber.entrySet())
{
subscriber = subscriberByNumber.getValue();
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);

assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0));
assertThat(subscriber.getOnNextEvents().size(), is(1));

final String word = subscriber.getOnNextEvents().get(0);
System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word);
assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey())));
}

assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1);
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests);
}

private String numberToWord(final int number)
{
return ObservableCommandNumbersToWords.dict.get(number);
}

private void sleep(final long ms)
{
try
{
Thread.sleep(1000);
}
catch (final InterruptedException e)
{
throw new IllegalStateException(e);
}
}

}
}
@@ -0,0 +1,81 @@
package com.netflix.hystrix.examples.basic;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import rx.Observable;
import rx.functions.Func1;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord;

/**
* A simple Hystrix Observable command that translates a number (<code>Integer</code>) into an English text.
*/
class ObservableCommandNumbersToWords extends HystrixObservableCommand<NumberWord>
{
private final List<Integer> numbers;

// in the real world you'd probably want to replace this very simple code by using ICU or similar
static Map<Integer, String> dict = new HashMap<Integer, String>(11);
static
{
dict.put(0, "zero");
dict.put(1, "one");
dict.put(2, "two");
dict.put(3, "three");
dict.put(4, "four");
dict.put(5, "five");
dict.put(6, "six");
dict.put(7, "seven");
dict.put(8, "eight");
dict.put(9, "nine");
dict.put(10, "ten");
}

public ObservableCommandNumbersToWords(final List<Integer> numbers)
{
super(HystrixCommandGroupKey.Factory.asKey(ObservableCommandNumbersToWords.class.getName()));
this.numbers = numbers;
}

@Override
protected Observable<NumberWord> construct()
{
return Observable.from(numbers).map(new Func1<Integer, NumberWord>()
{
@Override
public NumberWord call(final Integer number)
{
return new NumberWord(number, dict.get(number));
}

});
}

static class NumberWord
{
private final Integer number;
private final String word;

public NumberWord(final Integer number, final String word)
{
super();
this.number = number;
this.word = word;
}

public Integer getNumber()
{
return number;
}

public String getWord()
{
return word;
}
}

}

0 comments on commit 329c2ec

Please sign in to comment.