@@ -13,31 +13,27 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.example.common.MockService;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class SyncCollectionFlatMapExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class SyncCollectionFlatMapExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new SyncCollectionFlatMapExample().runExample();
}

static final List<String> urls = Arrays.asList("http://www.linkedin.com", "http://www.google.com", "http://www.twitter.com");
static final List<String> urls =
Arrays.asList("http://www.linkedin.com", "http://www.google.com", "http://www.twitter.com");
static final List<String> paths = Arrays.asList("/p1", "/p2");

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

Task<String> task = ParSeqCollection.fromValues(urls)
.flatMap(base -> ParSeqCollection.fromValues(paths)
.map(path -> base + path)
.mapTask(url -> fetchUrl(httpClient, url)))
.reduce((a, b) -> a + "\n" + b)
.recover(e -> "error: " + e.toString())
.andThen(System.out::println);
.flatMap(base -> ParSeqCollection.fromValues(paths).map(path -> base + path)
.mapTask(url -> fetchUrl(httpClient, url)))
.reduce((a, b) -> a + "\n" + b).recover(e -> "error: " + e.toString()).andThen(System.out::println);

engine.run(task);

@@ -8,28 +8,26 @@
import com.linkedin.parseq.Task;
import com.linkedin.parseq.example.common.AbstractExample;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class SyncGroupByExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class SyncGroupByExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new SyncGroupByExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 2, 3, 5, 3);

Task<String> task = null;
// Collections.fromIterable(ints)
// .groupBy(i -> i)
// .mapTask(group ->
// (Task<String>)group.count().map(count ->
// "group: " + group.getKey() + ", count: " + count))
// .reduce((a, b) -> a + "\n" + b );
// Collections.fromIterable(ints)
// .groupBy(i -> i)
// .mapTask(group ->
// (Task<String>)group.count().map(count ->
// "group: " + group.getKey() + ", count: " + count))
// .reduce((a, b) -> a + "\n" + b );

System.out.println(task);
}
@@ -22,28 +22,21 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public abstract class AbstractExample
{
public abstract class AbstractExample {
private volatile ScheduledExecutorService _serviceScheduler;

public void runExample() throws Exception
{
public void runExample() throws Exception {
_serviceScheduler = Executors.newScheduledThreadPool(2);
final int numCores = Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(scheduler)
.setTimerScheduler(scheduler)
.build();
try
{
final Engine engine = new EngineBuilder().setTaskExecutor(scheduler).setTimerScheduler(scheduler).build();
try {
doRunExample(engine);
}
finally
{
} finally {
engine.shutdown();
scheduler.shutdownNow();
_serviceScheduler.shutdown();
@@ -53,8 +46,7 @@ public void runExample() throws Exception

protected abstract void doRunExample(Engine engine) throws Exception;

protected <T> MockService<T> getService()
{
protected <T> MockService<T> getService() {
return new MockService<T>(_serviceScheduler);
}
}
@@ -19,25 +19,21 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ErrorMockRequest<T> implements MockRequest<T>
{
public class ErrorMockRequest<T> implements MockRequest<T> {
private final long _latency;
private final Exception _error;

public ErrorMockRequest(final long latency, final Exception error)
{
public ErrorMockRequest(final long latency, final Exception error) {
_latency = latency;
_error = error;
}

public long getLatency()
{
public long getLatency() {
return _latency;
}

@Override
public T getResult() throws Exception
{
public T getResult() throws Exception {
throw _error;
}

@@ -27,26 +27,24 @@
import com.linkedin.parseq.trace.Trace;
import com.linkedin.parseq.trace.codec.json.JsonTraceCodec;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ExampleUtil
{
public class ExampleUtil {
private static final Random RANDOM = new Random();
private static final int DEFAULT_LATENCY_MEAN = 100;
private static final int DEFAULT_LATENCY_STDDEV = 50;
private static final int LATENCY_MIN = 10;

private ExampleUtil() {}
private ExampleUtil() {
}

public static <RES> Task<RES> callService(final String name,
final MockService<RES> service,
final MockRequest<RES> request)
{
public static <RES> Task<RES> callService(final String name, final MockService<RES> service,
final MockRequest<RES> request) {
return new BaseTask<RES>(name) {
@Override
protected Promise<RES> run(final Context context) throws Exception
{
protected Promise<RES> run(final Context context) throws Exception {
return service.call(request);
}
};
@@ -55,50 +53,43 @@ protected Promise<RES> run(final Context context) throws Exception
public static <T> Task<T> fetch(String name, final MockService<T> service, final int id, final Map<Integer, T> map) {
final long mean = DEFAULT_LATENCY_MEAN;
final long stddev = DEFAULT_LATENCY_STDDEV;
final long latency = Math.max(LATENCY_MIN, (int)(RANDOM.nextGaussian() * stddev + mean));
final MockRequest<T> request = (map.containsKey(id)) ?
new SimpleMockRequest<T>(latency, map.get(id)) :
new ErrorMockRequest<T>(latency, new Exception("404"));
final long latency = Math.max(LATENCY_MIN, (int) (RANDOM.nextGaussian() * stddev + mean));
final MockRequest<T> request = (map.containsKey(id)) ? new SimpleMockRequest<T>(latency, map.get(id))
: new ErrorMockRequest<T>(latency, new Exception("404"));
return callService("fetch" + name + "[id=" + id + "]", service, request);
}

public static Task<String> fetchUrl(final MockService<String> httpClient,
final String url)
{
public static Task<String> fetchUrl(final MockService<String> httpClient, final String url) {
final long mean = DEFAULT_LATENCY_MEAN;
final long stddev = DEFAULT_LATENCY_STDDEV;
final long latency = Math.max(LATENCY_MIN, (int)(RANDOM.nextGaussian() * stddev + mean));
return callService("fetch[url=" + url + "]", httpClient, new SimpleMockRequest<String>(latency, "HTTP response for " + url));
final long latency = Math.max(LATENCY_MIN, (int) (RANDOM.nextGaussian() * stddev + mean));
return callService("fetch[url=" + url + "]", httpClient,
new SimpleMockRequest<String>(latency, "HTTP response for " + url));
}

public static Task<String> fetchUrl(final MockService<String> httpClient, final String url, final long latency) {
return callService("fetch[url=" + url + "]", httpClient, new SimpleMockRequest<String>(latency,
"HTTP response for " + url));
return callService("fetch[url=" + url + "]", httpClient,
new SimpleMockRequest<String>(latency, "HTTP response for " + url));
}

public static Task<String> fetch404Url(final MockService<String> httpClient,
final String url)
{
public static Task<String> fetch404Url(final MockService<String> httpClient, final String url) {
final long mean = DEFAULT_LATENCY_MEAN;
final long stddev = DEFAULT_LATENCY_STDDEV;
final long latency = Math.max(LATENCY_MIN, (int)(RANDOM.nextGaussian() * stddev + mean));
return callService("fetch[url=" + url + "]", httpClient, new ErrorMockRequest<String>(latency, new Exception(url + ": 404")));
final long latency = Math.max(LATENCY_MIN, (int) (RANDOM.nextGaussian() * stddev + mean));
return callService("fetch[url=" + url + "]", httpClient,
new ErrorMockRequest<String>(latency, new Exception(url + ": 404")));
}

public static void printTracingResults(final Task<?> task)
{
public static void printTracingResults(final Task<?> task) {
final Trace trace = task.getTrace();

System.out.println();
System.out.println();
System.out.println("JSON Trace:");

try
{
try {
System.out.println(new JsonTraceCodec().encode(trace));
}
catch (IOException e)
{
} catch (IOException e) {
System.err.println("Failed to encode JSON");
}
System.out.println();
@@ -19,8 +19,7 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public interface MockRequest<RES>
{
public interface MockRequest<RES> {
long getLatency();

RES getResult() throws Exception;
@@ -23,32 +23,25 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class MockService<RES>
{
public class MockService<RES> {
private final ScheduledExecutorService _scheduler;

public MockService(ScheduledExecutorService scheduler)
{
public MockService(ScheduledExecutorService scheduler) {
_scheduler = scheduler;
}

public Promise<RES> call(final MockRequest<RES> request)
{
public Promise<RES> call(final MockRequest<RES> request) {
final SettablePromise<RES> promise = Promises.settable();
_scheduler.schedule(new Runnable()
{
_scheduler.schedule(new Runnable() {
@Override
public void run()
{
try
{
public void run() {
try {
promise.done(request.getResult());
}
catch (Exception e)
{
} catch (Exception e) {
promise.fail(e);
}
}
@@ -19,24 +19,20 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class SimpleMockRequest<RES> implements MockRequest<RES>
{
public class SimpleMockRequest<RES> implements MockRequest<RES> {
private final long _latency;
private final RES _result;

public SimpleMockRequest(final long latency, final RES result)
{
public SimpleMockRequest(final long latency, final RES result) {
_latency = latency;
_result = result;
}

public long getLatency()
{
public long getLatency() {
return _latency;
}

public RES getResult()
{
public RES getResult() {
return _result;
}
}
@@ -31,37 +31,31 @@
import com.linkedin.parseq.example.common.MockService;
import com.linkedin.parseq.example.common.SimpleMockRequest;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class FixedTimeBoundSearchExample extends AbstractExample
{
public class FixedTimeBoundSearchExample extends AbstractExample {
// How long it takes to get a response for each request
private static final List<Long> REQUEST_LATENCIES = Arrays.asList(175L, 67L, 30L, 20L, 177L, 350L);

public static void main(String[] args) throws Exception
{
public static void main(String[] args) throws Exception {
new FixedTimeBoundSearchExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> service = getService();

AtomicInteger idx = new AtomicInteger();
Task<List<Integer>> example =
ParSeqCollection.fromValues(REQUEST_LATENCIES)
.mapTask(requestLatency -> callService("subSearch[" + idx.get() + "]",
service,
new SimpleMockRequest<Integer>(requestLatency, idx.getAndIncrement())))
.within(200, TimeUnit.MILLISECONDS)
.toList();
Task<List<Integer>> example = ParSeqCollection.fromValues(REQUEST_LATENCIES)
.mapTask(requestLatency -> callService("subSearch[" + idx.get() + "]", service,
new SimpleMockRequest<Integer>(requestLatency, idx.getAndIncrement())))
.within(200, TimeUnit.MILLISECONDS).toList();

System.out.printf("This com.linkedin.asm.example will issue %d parallel requests\n", REQUEST_LATENCIES.size());
System.out.println();
for (int i = 0; i < REQUEST_LATENCIES.size(); i++)
{
for (int i = 0; i < REQUEST_LATENCIES.size(); i++) {
System.out.printf("Request %d will take %3dms to complete\n", i, REQUEST_LATENCIES.get(i));
}

@@ -25,17 +25,16 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.function.Tuples;


/**
* The merge sort example demonstrates how branching and recursive plan
* execution work. It is not intended as a model for doing parallel
* computation!
*
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class MergeSortExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class MergeSortExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new MergeSortExample().runExample();
}

@@ -64,9 +63,8 @@ private Task<int[]> mergeSort(final int[] toSort, final Range range) {
// Neither base case applied, so recursively split this problem into
// smaller problems and then merge the results.
return Task.callable("split", () -> Tuples.tuple(range.firstHalf(), range.secondHalf()))
.flatMap(ranges ->
Task.par(mergeSort(toSort, ranges._1()), mergeSort(toSort, ranges._2()))
.map("merge", parts -> merge(ranges._1(), parts._1(), ranges._2(), parts._2())));
.flatMap(ranges -> Task.par(mergeSort(toSort, ranges._1()), mergeSort(toSort, ranges._2())).map("merge",
parts -> merge(ranges._1(), parts._1(), ranges._2(), parts._2())));
}
}

@@ -35,42 +35,38 @@
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class TimeBoundSearchExample extends AbstractExample
{
public class TimeBoundSearchExample extends AbstractExample {
// How long it takes to get a response for each request
private static final long[] REQUEST_LATENCIES = new long[] {175, 67, 30, 20, 177, 350};
private static final long[] REQUEST_LATENCIES = new long[] { 175, 67, 30, 20, 177, 350 };

// How long the engine will wait for index number of responses
private static final long[] WAIT_TIMES = new long[] {400, 300, 200, 100, 0};
private static final long[] WAIT_TIMES = new long[] { 400, 300, 200, 100, 0 };

public static void main(String[] args) throws Exception
{
public static void main(String[] args) throws Exception {
new TimeBoundSearchExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> service = getService();

final SearchTask example = new SearchTask(service);

System.out.printf("This com.linkedin.asm.example will issue %d parallel requests\n", REQUEST_LATENCIES.length);
System.out.println();
for (int i = 0; i < REQUEST_LATENCIES.length; i++)
{
for (int i = 0; i < REQUEST_LATENCIES.length; i++) {
System.out.printf("Request %d will take %3dms to complete\n", i, REQUEST_LATENCIES[i]);
}

System.out.println();
System.out.println("Latency rules:");
System.out.println("--------------");
for (int i = 0; i < WAIT_TIMES.length; i++)
{
for (int i = 0; i < WAIT_TIMES.length; i++) {
System.out.printf("Finish if received %d responses after %3dms\n", i, WAIT_TIMES[i]);
}

@@ -88,75 +84,60 @@ protected void doRunExample(final Engine engine) throws Exception
printTracingResults(example);
}

private static class SearchTask extends BaseTask<List<Integer>>
{
private static class SearchTask extends BaseTask<List<Integer>> {
private final MockService<Integer> _service;
private final List<Integer> _responses = new ArrayList<Integer>();
private final SettablePromise<List<Integer>> _result = Promises.settable();

private long _startMillis;

public SearchTask(final MockService<Integer> service)
{
public SearchTask(final MockService<Integer> service) {
super("search");
_service = service;
}

@Override
public Promise<List<Integer>> run(final Context ctx)
{
public Promise<List<Integer>> run(final Context ctx) {
// Save the start time so we can determine when to finish
_startMillis = System.currentTimeMillis();

// Set up timeouts for responses
long lastWaitTime = Integer.MAX_VALUE;
for (final long waitTime : WAIT_TIMES)
{
if (waitTime < lastWaitTime && waitTime > 0)
{
for (final long waitTime : WAIT_TIMES) {
if (waitTime < lastWaitTime && waitTime > 0) {
ctx.createTimer(waitTime, TimeUnit.MILLISECONDS, checkDone());
lastWaitTime = waitTime;
}
}

// Issue requests
for (int i = 0; i < REQUEST_LATENCIES.length; i++)
{
for (int i = 0; i < REQUEST_LATENCIES.length; i++) {
final long requestLatency = REQUEST_LATENCIES[i];
final Task<Integer> callSvc =
callService("subSearch[" + i + "]",
_service,
new SimpleMockRequest<Integer>(requestLatency, i));
callService("subSearch[" + i + "]", _service, new SimpleMockRequest<Integer>(requestLatency, i));

ctx.run(callSvc.andThen(addResponse(callSvc)).andThen(checkDone()));
}

return _result;
}

private Task<?> checkDone()
{
return Task.action("checkDone", new Action()
{
private Task<?> checkDone() {
return Task.action("checkDone", new Action() {
@Override
public void run()
{
public void run() {
final int index = Math.min(WAIT_TIMES.length - 1, _responses.size());
if (WAIT_TIMES[index] + _startMillis <= System.currentTimeMillis())
{
if (WAIT_TIMES[index] + _startMillis <= System.currentTimeMillis()) {
_result.done(_responses);
}
}
});
}

private Task<?> addResponse(final Promise<Integer> response)
{
return Task.action("addResponse", new Action()
{
private Task<?> addResponse(final Promise<Integer> response) {
return Task.action("addResponse", new Action() {
@Override
public void run()
{
public void run() {
_responses.add(response.get());
}
});
@@ -29,29 +29,26 @@
import com.linkedin.parseq.example.common.AbstractExample;
import com.linkedin.parseq.function.Tuple2;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class TwoStageFanoutExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class TwoStageFanoutExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new TwoStageFanoutExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {

//TODO early finish???

List<String> first = Arrays.asList("http://www.bing.com", "http://www.yahoo.com");
List<String> second = Arrays.asList("http://www.google.com", "https://duckduckgo.com/");

Task<String> fanout = stage(first, new StringBuilder())
.flatMap(resultBuilder -> stage(second, resultBuilder))
.map(builder -> builder.toString());
Task<String> fanout = stage(first, new StringBuilder()).flatMap(resultBuilder -> stage(second, resultBuilder))
.map(builder -> builder.toString());

final Task<?> plan = fanout.andThen(System.out::println);

@@ -63,9 +60,7 @@ protected void doRunExample(final Engine engine) throws Exception

private Task<StringBuilder> stage(final List<String> input, final StringBuilder resultBuilder) {
return ParSeqCollection.fromValues(input)
.mapTask(url -> (Task<Tuple2<String, String>>)fetchUrl(getService(), url)
.map(s -> tuple(url, s)))
.fold(resultBuilder, (z, r) ->
z.append(String.format("%10s => %s\n", r._1(), r._2())));
.mapTask(url -> (Task<Tuple2<String, String>>) fetchUrl(getService(), url).map(s -> tuple(url, s)))
.fold(resultBuilder, (z, r) -> z.append(String.format("%10s => %s\n", r._1(), r._2())));
}
}
@@ -19,8 +19,7 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public enum Classification
{
public enum Classification {
FULL_VISIBILITY,
PARTIAL_VISIBILITY,
NO_VISIBILITY
@@ -20,7 +20,6 @@
* @author Chris Pettitt
* @version $Revision$
*/
public interface Classifier
{
public interface Classifier {
Classification classify(long vieweeId);
}
@@ -29,13 +29,12 @@
import com.linkedin.parseq.example.composite.classifier.client.Client;
import com.linkedin.parseq.example.composite.classifier.client.impl.ClientImpl;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ClassifierDriver
{
public static void main(String[] args) throws InterruptedException
{
public class ClassifierDriver {
public static void main(String[] args) throws InterruptedException {
final long viewerId = 0;

final Set<Long> unclassified = new HashSet<Long>();
@@ -48,23 +47,17 @@ public static void main(String[] args) throws InterruptedException

final int numCores = Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(scheduler)
.setTimerScheduler(scheduler)
.build();
final Engine engine = new EngineBuilder().setTaskExecutor(scheduler).setTimerScheduler(scheduler).build();

final ClassifierPlanFactory classifier = new ClassifierPlanFactory(restLiClient);
try
{
try {
final Task<Map<Long, Classification>> classifications = classifier.classify(viewerId, unclassified);
engine.run(classifications);
classifications.await();
System.out.println(classifications.get());

ExampleUtil.printTracingResults(classifications);
}
finally
{
} finally {
serviceScheduler.shutdownNow();
engine.shutdown();
scheduler.shutdownNow();
@@ -35,42 +35,35 @@
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ClassifierPlanFactory
{
public class ClassifierPlanFactory {
private final Client _client;

public ClassifierPlanFactory(final Client client)
{
public ClassifierPlanFactory(final Client client) {
_client = client;
}

public Task<Map<Long, Classification>> classify(final long viewerId,
final Set<Long> vieweeIds)
{
public Task<Map<Long, Classification>> classify(final long viewerId, final Set<Long> vieweeIds) {
return new ClassifierPlan(viewerId, vieweeIds);
}

private class ClassifierPlan extends BaseTask<Map<Long, Classification>>
{
private class ClassifierPlan extends BaseTask<Map<Long, Classification>> {
private final long _viewerId;
private final Set<Long> _unclassified;
private final Map<Long, Classification> _classified = new HashMap<Long, Classification>();
private final SettablePromise<Map<Long, Classification>> _result = Promises.settable();

private ClassifierPlan(final long viewerId,
final Set<Long> unclassified)
{
private ClassifierPlan(final long viewerId, final Set<Long> unclassified) {
super("ClassifierPlan[viewerId=" + viewerId + "]");
_viewerId = viewerId;
_unclassified = new HashSet<Long>(unclassified);
}

@Override
public Promise<Map<Long, Classification>> run(final Context ctx)
{
public Promise<Map<Long, Classification>> run(final Context ctx) {
// Network data is shared across classifiers, so we create it here
final Task<Network> network = clientRequestTask(new GetNetworkRequest(_viewerId));

@@ -83,12 +76,14 @@ public Promise<Map<Long, Classification>> run(final Context ctx)

// Full visibility classification
final Task<?> directlyConnectedClassifier = connectedClassifyTask(network);
final Task<?> invitedToGroupClassifier = truthMapQueryClassifyTask("GroupInvited", 1, Classification.FULL_VISIBILITY);
final Task<?> invitedToGroupClassifier =
truthMapQueryClassifyTask("GroupInvited", 1, Classification.FULL_VISIBILITY);
final Task<?> messagedClassifier = truthMapQueryClassifyTask("Messaged", 2, Classification.FULL_VISIBILITY);

// Partial visibility classification
final Task<?> inNetworkClassifier = networkClassifyTask(network);
final Task<?> sharesGroupClassifier = truthMapQueryClassifyTask("CommonGroups", 4, Classification.PARTIAL_VISIBILITY);
final Task<?> sharesGroupClassifier =
truthMapQueryClassifyTask("CommonGroups", 4, Classification.PARTIAL_VISIBILITY);

// Default visibility (i.e. no visibility)
final Task<?> defaultClassifier = classifyTask(DefaultClassifier.instance());
@@ -97,35 +92,28 @@ public Promise<Map<Long, Classification>> run(final Context ctx)
ctx.createTimer(1, TimeUnit.SECONDS, defaultClassifier);

// ORDERING
final Task<?> ordering = selfClassifier
.andThen(Task.par(network.andThen(directlyConnectedClassifier),
invitedToGroupClassifier,
messagedClassifier))
.andThen(Task.par(inNetworkClassifier,
sharesGroupClassifier))
.andThen(defaultClassifier);
final Task<?> ordering =
selfClassifier
.andThen(
Task.par(network.andThen(directlyConnectedClassifier), invitedToGroupClassifier, messagedClassifier))
.andThen(Task.par(inNetworkClassifier, sharesGroupClassifier)).andThen(defaultClassifier);

ctx.run(ordering);

return _result;
}

private Task<?> classifyTask(final Classifier classifier)
{
return Task.action(classifier.getClass().getSimpleName(), new Action()
{
private Task<?> classifyTask(final Classifier classifier) {
return Task.action(classifier.getClass().getSimpleName(), new Action() {
@Override
public void run()
{
public void run() {
doClassify(classifier);
}
});
}

private Task<?> truthMapQueryClassifyTask(final String name,
final int remainder,
final Classification classification)
{
private Task<?> truthMapQueryClassifyTask(final String name, final int remainder,
final Classification classification) {
final Task<Map<Long, Boolean>> svcCall =
clientRequestTask(new TruthMapRequest("get" + name, remainder, _unclassified));

@@ -134,71 +122,54 @@ private Task<?> truthMapQueryClassifyTask(final String name,
return svcCall.andThen(classifyResult);
}

private Task<?> truthMapClassifyTask(final String name,
final Classification classification,
final Promise<Map<Long, Boolean>> result)
{
return Task.action(name + "Classifier", new Action()
{
private Task<?> truthMapClassifyTask(final String name, final Classification classification,
final Promise<Map<Long, Boolean>> result) {
return Task.action(name + "Classifier", new Action() {
@Override
public void run()
{
public void run() {
doClassify(new TruthMapClassifier(classification, result.get()));
}
});
}

private <T> Task<T> clientRequestTask(final Request<T> request)
{
return new BaseTask<T>(request.getName())
{
private <T> Task<T> clientRequestTask(final Request<T> request) {
return new BaseTask<T>(request.getName()) {
@Override
protected Promise<? extends T> run(final Context context) throws Exception
{
protected Promise<? extends T> run(final Context context) throws Exception {
return _client.sendRequest(request);
}
};
}

private Task<?> connectedClassifyTask(final Task<Network> network)
{
return Task.action("ConnectedClassifier", new Action()
{
private Task<?> connectedClassifyTask(final Task<Network> network) {
return Task.action("ConnectedClassifier", new Action() {
@Override
public void run()
{
public void run() {
doClassify(new ConnectedClassifier(network.get()));
}
});
}

private Task<?> networkClassifyTask(final Task<Network> network)
{
return Task.action("NetworkClassifier", new Action()
{
private Task<?> networkClassifyTask(final Task<Network> network) {
return Task.action("NetworkClassifier", new Action() {
@Override
public void run()
{
public void run() {
doClassify(new NetworkClassifier(network.get()));
}
});
}

private void doClassify(final Classifier classifier)
{
for (Iterator<Long> it = _unclassified.iterator(); it.hasNext(); )
{
private void doClassify(final Classifier classifier) {
for (Iterator<Long> it = _unclassified.iterator(); it.hasNext();) {
final long vieweeId = it.next();
final Classification classification = classifier.classify(vieweeId);
if (classification != null)
{
if (classification != null) {
it.remove();
_classified.put(vieweeId, classification);
}
}

if (_unclassified.isEmpty())
{
if (_unclassified.isEmpty()) {
_result.done(_classified);
}
}
@@ -19,20 +19,16 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ConnectedClassifier implements Classifier
{
public class ConnectedClassifier implements Classifier {
private final Network _subnet;

public ConnectedClassifier(final Network subnet)
{
public ConnectedClassifier(final Network subnet) {
_subnet = subnet;
}

@Override
public Classification classify(final long vieweeId)
{
return _subnet != null && Network.Distance.D1.equals(_subnet.getDistance(vieweeId))
? Classification.FULL_VISIBILITY
public Classification classify(final long vieweeId) {
return _subnet != null && Network.Distance.D1.equals(_subnet.getDistance(vieweeId)) ? Classification.FULL_VISIBILITY
: null;
}
}
@@ -19,18 +19,15 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class DefaultClassifier implements Classifier
{
public class DefaultClassifier implements Classifier {
private static DefaultClassifier INSTANCE = new DefaultClassifier();

public static DefaultClassifier instance()
{
public static DefaultClassifier instance() {
return INSTANCE;
}

@Override
public Classification classify(final long vieweeId)
{
public Classification classify(final long vieweeId) {
return Classification.NO_VISIBILITY;
}
}
@@ -19,10 +19,8 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class Network
{
public static enum Distance
{
public class Network {
public static enum Distance {
NOT_IN_NETWORK,
SELF,
D1,
@@ -32,19 +30,16 @@

private final long memberId;

public Network(long memberId)
{
public Network(long memberId) {
this.memberId = memberId;
}

public Distance getDistance(final long memberId)
{
public Distance getDistance(final long memberId) {
if (memberId == this.memberId)
return Distance.SELF;

final long group = memberId % 10;
switch ((int) group)
{
switch ((int) group) {
case 10:
return Distance.D3;
case 9:
@@ -19,22 +19,18 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class NetworkClassifier implements Classifier
{
public class NetworkClassifier implements Classifier {
private final Network _subnet;

public NetworkClassifier(final Network subnet)
{
public NetworkClassifier(final Network subnet) {
_subnet = subnet;
}

@Override
public Classification classify(final long vieweeId)
{
public Classification classify(final long vieweeId) {
final Network.Distance distance = _subnet.getDistance(vieweeId);

return Network.Distance.D2.equals(distance) || Network.Distance.D3.equals(distance)
? Classification.PARTIAL_VISIBILITY
: null;
? Classification.PARTIAL_VISIBILITY : null;
}
}
@@ -19,20 +19,15 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class SelfClassifier implements Classifier
{
public class SelfClassifier implements Classifier {
private final long viewerId;

public SelfClassifier(final long viewerId)
{
public SelfClassifier(final long viewerId) {
this.viewerId = viewerId;
}

@Override
public Classification classify(final long vieweeId)
{
return vieweeId == viewerId
? Classification.FULL_VISIBILITY
: null;
public Classification classify(final long vieweeId) {
return vieweeId == viewerId ? Classification.FULL_VISIBILITY : null;
}
}
@@ -18,26 +18,22 @@

import java.util.Map;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TruthMapClassifier implements Classifier
{
public class TruthMapClassifier implements Classifier {
private final Classification _classification;
private final Map<Long, Boolean> _truthMap;

public TruthMapClassifier(Classification classification, Map<Long, Boolean> truthMap)
{
public TruthMapClassifier(Classification classification, Map<Long, Boolean> truthMap) {
_classification = classification;
_truthMap = truthMap;
}

@Override
public Classification classify(final long vieweeId)
{
public Classification classify(final long vieweeId) {
final Boolean b = _truthMap.get(vieweeId);
return b != null && b
? _classification
: null;
return b != null && b ? _classification : null;
}
}
@@ -18,10 +18,10 @@

import com.linkedin.parseq.promise.Promise;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public interface Client
{
public interface Client {
<T> Promise<T> sendRequest(final Request<T> request);
}
@@ -19,8 +19,7 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public interface Request<T>
{
public interface Request<T> {
T getResponse();

int getLatencyMean();
@@ -18,23 +18,21 @@

import com.linkedin.parseq.example.composite.classifier.client.Request;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public abstract class AbstractRequest<T> implements Request<T>
{
public abstract class AbstractRequest<T> implements Request<T> {
private static final int DEFAULT_LATENCY_MEAN = 100;
private static final int DEFAULT_LATENCY_STDDEV = 50;

@Override
public int getLatencyMean()
{
public int getLatencyMean() {
return DEFAULT_LATENCY_MEAN;
}

@Override
public int getLatencyStdDev()
{
public int getLatencyStdDev() {
return DEFAULT_LATENCY_STDDEV;
}
}
@@ -26,39 +26,32 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ClientImpl implements Client
{
public class ClientImpl implements Client {
private static final int LATENCY_MIN = 10;

private final ScheduledExecutorService scheduler;
private final Random random = new Random();

public ClientImpl(final ScheduledExecutorService scheduler)
{
public ClientImpl(final ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}

@Override
public <T> Promise<T> sendRequest(final Request<T> request)
{
public <T> Promise<T> sendRequest(final Request<T> request) {
final SettablePromise<T> promise = Promises.settable();
final int mean = request.getLatencyMean();
final int stddev = request.getLatencyStdDev();
final int latency = Math.max(LATENCY_MIN, (int)(random.nextGaussian() * stddev + mean));
scheduler.schedule(new Runnable()
{
final int latency = Math.max(LATENCY_MIN, (int) (random.nextGaussian() * stddev + mean));
scheduler.schedule(new Runnable() {
@Override
public void run()
{
try
{
public void run() {
try {
promise.done(request.getResponse());
}
catch (Exception e)
{
} catch (Exception e) {
promise.fail(e);
}
}
@@ -18,27 +18,24 @@

import com.linkedin.parseq.example.composite.classifier.Network;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class GetNetworkRequest extends AbstractRequest<Network>
{
public class GetNetworkRequest extends AbstractRequest<Network> {
private final long _memberId;

public GetNetworkRequest(final long memberId)
{
public GetNetworkRequest(final long memberId) {
_memberId = memberId;
}

@Override
public Network getResponse()
{
public Network getResponse() {
return new Network(_memberId);
}

@Override
public String getName()
{
public String getName() {
return "getNetwork";
}
}
@@ -20,38 +20,32 @@
import java.util.Map;
import java.util.Set;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TruthMapRequest extends AbstractRequest<Map<Long, Boolean>>
{
public class TruthMapRequest extends AbstractRequest<Map<Long, Boolean>> {
private final String _name;
private final int _remainder;
private final Set<Long> _memberIds;

public TruthMapRequest(final String name,
final int remainder,
final Set<Long> memberIds)
{
public TruthMapRequest(final String name, final int remainder, final Set<Long> memberIds) {
_name = name;
_remainder = remainder;
_memberIds = memberIds;
}

@Override
public Map<Long, Boolean> getResponse()
{
public Map<Long, Boolean> getResponse() {
final Map<Long, Boolean> result = new HashMap<Long, Boolean>();
for (Long memberId : _memberIds)
{
for (Long memberId : _memberIds) {
result.put(memberId, memberId % 10 == _remainder);
}
return result;
}

@Override
public String getName()
{
public String getName() {
return _name;
}
}
@@ -6,10 +6,12 @@
import java.util.List;
import java.util.Map;


public class DB {

public static final Map<Integer, Person> personDB = new HashMap<Integer, Person>() {
private static final long serialVersionUID = 1L;

{
put(1, new Person(1, "Bob", "Shmidt", 1, Arrays.asList(2, 3)));
put(2, new Person(2, "Garry", "Smith", 1, Arrays.asList(1)));
@@ -22,6 +24,7 @@ public class DB {

public static final Map<Integer, Company> companyDB = new HashMap<Integer, Company>() {
private static final long serialVersionUID = 1L;

{
put(1, new Company("LinkedIn"));
put(2, new Company("Twitter"));
@@ -31,6 +34,7 @@ public class DB {

public static final Map<Integer, Message> messageDB = new HashMap<Integer, Message>() {
private static final long serialVersionUID = 1L;

{
put(1, new Message(1, 2, "Hi", "Hi, how are you?"));
put(2, new Message(2, 1, "Re: Hi", "Hi, I'm great!"));
@@ -41,6 +45,7 @@ public class DB {

public static final Map<Integer, List<Integer>> mailboxDB = new HashMap<Integer, List<Integer>>() {
private static final long serialVersionUID = 1L;

{
put(1, Arrays.asList(1, 2, 3));
put(2, Arrays.asList(2));
@@ -11,22 +11,20 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.function.Tuples;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class Examples extends AbstractDomainExample
{
public static void main(String[] args) throws Exception
{
public class Examples extends AbstractDomainExample {
public static void main(String[] args) throws Exception {
new Examples().runExample();
}

//---------------------------------------------------------------

//create summary for a person: "<first name> <last name>"
Task<String> createSummary(int id) {
return fetchPerson(id)
.map(this::shortSummary);
return fetchPerson(id).map(this::shortSummary);
}

String shortSummary(Person person) {
@@ -37,19 +35,14 @@ String shortSummary(Person person) {

//handles failures delivering degraded experience
Task<String> createResilientSummary(int id) {
return fetchPerson(id)
.map(this::shortSummary)
.recover(e -> "Member " + id);
return fetchPerson(id).map(this::shortSummary).recover(e -> "Member " + id);
}

//---------------------------------------------------------------

//handles failures delivering degraded experience in timely fashion
Task<String> createResponsiveSummary(int id) {
return fetchPerson(id)
.withTimeout(100, TimeUnit.MILLISECONDS)
.map(this::shortSummary)
.recover(e -> "Member " + id);
return fetchPerson(id).withTimeout(100, TimeUnit.MILLISECONDS).map(this::shortSummary).recover(e -> "Member " + id);
}

//---------------------------------------------------------------
@@ -62,16 +55,15 @@ Task<String> createExtendedSummary(int id) {
}

Task<String> createExtendedSummary(final Person p) {
return fetchCompany(p.getCompanyId())
.map(company -> shortSummary(p) + " working at " + company.getName());
return fetchCompany(p.getCompanyId()).map(company -> shortSummary(p) + " working at " + company.getName());
}

//---------------------------------------------------------------

//create mailbox summary for a person: "<first name> <last name> has <X> messages"
Task<String> createMailboxSummary(int id) {
return Task.par(createSummary(id), fetchMailbox(id))
.map((summary, mailbox) -> summary + " has " + mailbox.size() + " messages");
.map((summary, mailbox) -> summary + " has " + mailbox.size() + " messages");

}

@@ -86,44 +78,33 @@ Task<String> createMailboxSummary(int id) {
//<first name> <last name> working at <company name>
//(...)
Task<String> createSummariesOfConnections(Integer id) {
return fetchPerson(id)
.flatMap(person -> createSummaries(person.getConnections()));
return fetchPerson(id).flatMap(person -> createSummaries(person.getConnections()));
}

Task<String> createSummaries(List<Integer> ids) {
return ParSeqCollection.fromValues(ids)
.mapTask(id -> createExtendedSummary(id))
.within(200, TimeUnit.MILLISECONDS)
.reduce((a, b) -> a + "\n" + b);
return ParSeqCollection.fromValues(ids).mapTask(id -> createExtendedSummary(id)).within(200, TimeUnit.MILLISECONDS)
.reduce((a, b) -> a + "\n" + b);
}

//---------------------------------------------------------------

//Find a message which contains given word
Task<String> findMessageWithWord(String word) {
return ParSeqCollection.fromValues(DB.personIds)
.mapTask(id -> fetchMailbox(id))
.flatMap(list -> ParSeqCollection.fromValues(list))
.mapTask(msgId -> fetchMessage(msgId))
.map(msg -> msg.getContents())
.find(s -> s.contains(word));
return ParSeqCollection.fromValues(DB.personIds).mapTask(id -> fetchMailbox(id))
.flatMap(list -> ParSeqCollection.fromValues(list)).mapTask(msgId -> fetchMessage(msgId))
.map(msg -> msg.getContents()).find(s -> s.contains(word));
}

//---------------------------------------------------------------

//given list of their ids, get list of N People working at LinkedIn who have at least 2 connections and 1 message
Task<List<Person>> getNPeopleWorkingAtLinkedIn(List<Integer> ids, int N) {
return ParSeqCollection.fromValues(ids)
.mapTask(id -> fetchPerson(id))
.filter(person -> person.getConnections().size() > 2)
.mapTask(person ->
Task.par(fetchMailbox(person.getId()),fetchCompany(person.getCompanyId()))
.map(tuple -> Tuples.tuple(person, tuple)))
.filter(tuple -> tuple._2()._1().size() >= 1 &&
tuple._2()._2().getName().equals("LinkedIn"))
.map(tuple -> tuple._1())
.take(N)
.toList();
return ParSeqCollection.fromValues(ids).mapTask(id -> fetchPerson(id))
.filter(person -> person.getConnections().size() > 2)
.mapTask(person -> Task.par(fetchMailbox(person.getId()), fetchCompany(person.getCompanyId()))
.map(tuple -> Tuples.tuple(person, tuple)))
.filter(tuple -> tuple._2()._1().size() >= 1 && tuple._2()._2().getName().equals("LinkedIn"))
.map(tuple -> tuple._1()).take(N).toList();
}

@Override
@@ -2,6 +2,7 @@

import java.util.List;


public class Person {

final int _id;
@@ -44,5 +45,4 @@ public String toString() {
+ _companyId + ", _connections=" + _connections + "]";
}


}
@@ -31,23 +31,21 @@

import static com.linkedin.parseq.Tasks.callable;


/**
* The merge sort example demonstrates how branching and recursive plan
* execution work. It is not intended as a model for doing parallel
* computation!
*
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class MergeSortExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class MergeSortExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new MergeSortExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final int[] toSort = createRandomArray(10, new Random());

final Task<int[]> mergeSort = new MergeSortPlan(toSort);
@@ -60,46 +58,35 @@ protected void doRunExample(final Engine engine) throws Exception
ExampleUtil.printTracingResults(mergeSort);
}

private static int[] createRandomArray(final int arraySize, final Random random)
{
private static int[] createRandomArray(final int arraySize, final Random random) {
final int[] nums = new int[arraySize];
for (int i = 0; i < arraySize; i++)
{
for (int i = 0; i < arraySize; i++) {
nums[i] = random.nextInt();
}
return nums;
}

private static class MergeSortPlan extends BaseTask<int[]>
{
private static class MergeSortPlan extends BaseTask<int[]> {
private final int[] _toSort;
private final Range _range;

public MergeSortPlan(final int[] toSort)
{
public MergeSortPlan(final int[] toSort) {
this(toSort, new Range(0, toSort.length));
}

private MergeSortPlan(final int[] toSort, final Range range)
{
private MergeSortPlan(final int[] toSort, final Range range) {
super("MergeSort " + range);
_toSort = toSort;
_range = range;
}

@Override
public Promise<int[]> run(final Context ctx)
{
if (_range.size() == 0)
{
public Promise<int[]> run(final Context ctx) {
if (_range.size() == 0) {
return Promises.value(new int[0]);
}
else if (_range.size() == 1)
{
return Promises.value(new int[] {_toSort[_range.start()]});
}
else
{
} else if (_range.size() == 1) {
return Promises.value(new int[] { _toSort[_range.start()] });
} else {
// Neither base case applied, so recursively split this problem into
// smaller problems and then merge the results.
final Range fstRange = _range.firstHalf();
@@ -113,21 +100,15 @@ else if (_range.size() == 1)
}
}

private Task<int[]> mergePlan(final Range fstRange,
final Promise<int[]> fstPromise,
final Range sndRange,
final Promise<int[]> sndPromise)
{
return callable("Merge " + fstRange + " " + sndRange, new Callable<int[]>()
{
private Task<int[]> mergePlan(final Range fstRange, final Promise<int[]> fstPromise, final Range sndRange,
final Promise<int[]> sndPromise) {
return callable("Merge " + fstRange + " " + sndRange, new Callable<int[]>() {
@Override
public int[] call() throws Exception
{
public int[] call() throws Exception {
final int[] fst = fstPromise.get();
final int[] snd = sndPromise.get();
final int[] results = new int[fst.length + snd.length];
for (int i = 0, l = 0, r = 0; i < results.length; i++)
{
for (int i = 0, l = 0, r = 0; i < results.length; i++) {
if (l == fst.length)
results[i] = snd[r++];
else if (r == snd.length)
@@ -141,44 +122,36 @@ else if (r == snd.length)
}
}

private static class Range
{
private static class Range {
private final int _start;
private final int _end;

public Range(int start, int end)
{
public Range(int start, int end) {
_start = start;
_end = end;
}

public int start()
{
public int start() {
return _start;
}

public Range firstHalf()
{
public Range firstHalf() {
return new Range(_start, midpoint());
}

public Range secondHalf()
{
public Range secondHalf() {
return new Range(midpoint(), _end);
}

public int size()
{
public int size() {
return _end - _start;
}

public String toString()
{
return "[" + _start + "," + _end + ")";
public String toString() {
return "[" + _start + "," + _end + ")";
}

private int midpoint()
{
private int midpoint() {
return (_end - _start) / 2 + _start;
}
}
@@ -36,41 +36,37 @@
import static com.linkedin.parseq.example.common.ExampleUtil.callService;
import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TimeBoundSearchExample extends AbstractExample
{
public class TimeBoundSearchExample extends AbstractExample {
// How long it takes to get a response for each request
private static final long[] REQUEST_LATENCIES = new long[] {175, 67, 30, 20, 177, 350};
private static final long[] REQUEST_LATENCIES = new long[] { 175, 67, 30, 20, 177, 350 };

// How long the engine will wait for index number of responses
private static final long[] WAIT_TIMES = new long[] {400, 300, 200, 100, 0};
private static final long[] WAIT_TIMES = new long[] { 400, 300, 200, 100, 0 };

public static void main(String[] args) throws Exception
{
public static void main(String[] args) throws Exception {
new TimeBoundSearchExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> service = getService();

final SearchTask example = new SearchTask(service);

System.out.printf("This com.linkedin.asm.example will issue %d parallel requests\n", REQUEST_LATENCIES.length);
System.out.println();
for (int i = 0; i < REQUEST_LATENCIES.length; i++)
{
for (int i = 0; i < REQUEST_LATENCIES.length; i++) {
System.out.printf("Request %d will take %3dms to complete\n", i, REQUEST_LATENCIES[i]);
}

System.out.println();
System.out.println("Latency rules:");
System.out.println("--------------");
for (int i = 0; i < WAIT_TIMES.length; i++)
{
for (int i = 0; i < WAIT_TIMES.length; i++) {
System.out.printf("Finish if received %d responses after %3dms\n", i, WAIT_TIMES[i]);
}

@@ -88,75 +84,60 @@ protected void doRunExample(final Engine engine) throws Exception
printTracingResults(example);
}

private static class SearchTask extends BaseTask<List<Integer>>
{
private static class SearchTask extends BaseTask<List<Integer>> {
private final MockService<Integer> _service;
private final List<Integer> _responses = new ArrayList<Integer>();
private final SettablePromise<List<Integer>> _result = Promises.settable();

private long _startMillis;

public SearchTask(final MockService<Integer> service)
{
public SearchTask(final MockService<Integer> service) {
super("search");
_service = service;
}

@Override
public Promise<List<Integer>> run(final Context ctx)
{
public Promise<List<Integer>> run(final Context ctx) {
// Save the start time so we can determine when to finish
_startMillis = System.currentTimeMillis();

// Set up timeouts for responses
long lastWaitTime = Integer.MAX_VALUE;
for (final long waitTime : WAIT_TIMES)
{
if (waitTime < lastWaitTime && waitTime > 0)
{
for (final long waitTime : WAIT_TIMES) {
if (waitTime < lastWaitTime && waitTime > 0) {
ctx.createTimer(waitTime, TimeUnit.MILLISECONDS, checkDone());
lastWaitTime = waitTime;
}
}

// Issue requests
for (int i = 0; i < REQUEST_LATENCIES.length; i++)
{
for (int i = 0; i < REQUEST_LATENCIES.length; i++) {
final long requestLatency = REQUEST_LATENCIES[i];
final Task<Integer> callSvc =
callService("subSearch[" + i + "]",
_service,
new SimpleMockRequest<Integer>(requestLatency, i));
callService("subSearch[" + i + "]", _service, new SimpleMockRequest<Integer>(requestLatency, i));

ctx.run(seq(callSvc, addResponse(callSvc), checkDone()));
}

return _result;
}

private Task<?> checkDone()
{
return action("checkDone", new Runnable()
{
private Task<?> checkDone() {
return action("checkDone", new Runnable() {
@Override
public void run()
{
public void run() {
final int index = Math.min(WAIT_TIMES.length - 1, _responses.size());
if (WAIT_TIMES[index] + _startMillis <= System.currentTimeMillis())
{
if (WAIT_TIMES[index] + _startMillis <= System.currentTimeMillis()) {
_result.done(_responses);
}
}
});
}

private Task<?> addResponse(final Promise<Integer> response)
{
return action("addResponse", new Runnable()
{
private Task<?> addResponse(final Promise<Integer> response) {
return action("addResponse", new Runnable() {
@Override
public void run()
{
public void run() {
_responses.add(response.get());
}
});
@@ -33,27 +33,23 @@
import static com.linkedin.parseq.example.common.ExampleUtil.fetchUrl;
import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TwoStageFanoutExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class TwoStageFanoutExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new TwoStageFanoutExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final FanoutTask fanout = new FanoutTask(httpClient);
final Task<?> printResults = action("printResults", new Runnable()
{
final Task<?> printResults = action("printResults", new Runnable() {
@Override
public void run()
{
public void run() {
System.out.println(fanout.get());
}
});
@@ -65,56 +61,42 @@ public void run()
printTracingResults(plan);
}

private static class FanoutTask extends BaseTask<String>
{
private static class FanoutTask extends BaseTask<String> {
private final MockService<String> _httpClient;
private final StringBuilder _result = new StringBuilder();

private FanoutTask(final MockService<String> httpClient)
{
private FanoutTask(final MockService<String> httpClient) {
super("TwoStageFanout");
_httpClient = httpClient;
}

@Override
public Promise<String> run(final Context ctx)
{
final Task<String> twoStage =
seq(par(fetchAndLog("http://www.bing.com"),
fetchAndLog("http://www.yahoo.com")),
par(fetchAndLog("http://www.google.com"),
fetchAndLog("https://duckduckgo.com/")),
buildResult());
public Promise<String> run(final Context ctx) {
final Task<String> twoStage = seq(par(fetchAndLog("http://www.bing.com"), fetchAndLog("http://www.yahoo.com")),
par(fetchAndLog("http://www.google.com"), fetchAndLog("https://duckduckgo.com/")), buildResult());
ctx.run(twoStage);
return twoStage;
}

private Task<String> buildResult()
{
return callable("buildResult", new Callable<String>()
{
private Task<String> buildResult() {
return callable("buildResult", new Callable<String>() {
@Override
public String call()
{
public String call() {
return _result.toString();
}
});
}

private Task<?> fetchAndLog(final String url)
{
private Task<?> fetchAndLog(final String url) {
final Task<String> fetch = fetchUrl(_httpClient, url);
final Task<?> logResult = logResult(url, fetch);
return seq(fetch, logResult);
}

private Task<?> logResult(final String url, final Promise<String> promise)
{
return action("logResult[" + url + "]", new Runnable()
{
private Task<?> logResult(final String url, final Promise<String> promise) {
return action("logResult[" + url + "]", new Runnable() {
@Override
public void run()
{
public void run() {
_result.append(String.format("%10s => %s\n", url, promise.get()));
}
});
@@ -19,8 +19,7 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public enum Classification
{
public enum Classification {
FULL_VISIBILITY,
PARTIAL_VISIBILITY,
NO_VISIBILITY
@@ -20,7 +20,6 @@
* @author Chris Pettitt
* @version $Revision$
*/
public interface Classifier
{
public interface Classifier {
Classification classify(long vieweeId);
}
@@ -29,13 +29,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ClassifierDriver
{
public static void main(String[] args) throws InterruptedException
{
public class ClassifierDriver {
public static void main(String[] args) throws InterruptedException {
final long viewerId = 0;

final Set<Long> unclassified = new HashSet<Long>();
@@ -48,23 +47,17 @@ public static void main(String[] args) throws InterruptedException

final int numCores = Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(scheduler)
.setTimerScheduler(scheduler)
.build();
final Engine engine = new EngineBuilder().setTaskExecutor(scheduler).setTimerScheduler(scheduler).build();

final ClassifierPlanFactory classifier = new ClassifierPlanFactory(restLiClient);
try
{
try {
final Task<Map<Long, Classification>> classifications = classifier.classify(viewerId, unclassified);
engine.run(classifications);
classifications.await();
System.out.println(classifications.get());

ExampleUtil.printTracingResults(classifications);
}
finally
{
} finally {
serviceScheduler.shutdownNow();
engine.shutdown();
scheduler.shutdownNow();
@@ -38,42 +38,35 @@
import static com.linkedin.parseq.Tasks.par;
import static com.linkedin.parseq.Tasks.seq;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ClassifierPlanFactory
{
public class ClassifierPlanFactory {
private final Client _client;

public ClassifierPlanFactory(final Client client)
{
public ClassifierPlanFactory(final Client client) {
_client = client;
}

public Task<Map<Long, Classification>> classify(final long viewerId,
final Set<Long> vieweeIds)
{
public Task<Map<Long, Classification>> classify(final long viewerId, final Set<Long> vieweeIds) {
return new ClassifierPlan(viewerId, vieweeIds);
}

private class ClassifierPlan extends BaseTask<Map<Long, Classification>>
{
private class ClassifierPlan extends BaseTask<Map<Long, Classification>> {
private final long _viewerId;
private final Set<Long> _unclassified;
private final Map<Long, Classification> _classified = new HashMap<Long, Classification>();
private final SettablePromise<Map<Long, Classification>> _result = Promises.settable();

private ClassifierPlan(final long viewerId,
final Set<Long> unclassified)
{
private ClassifierPlan(final long viewerId, final Set<Long> unclassified) {
super("ClassifierPlan[viewerId=" + viewerId + "]");
_viewerId = viewerId;
_unclassified = new HashSet<Long>(unclassified);
}

@Override
public Promise<Map<Long, Classification>> run(final Context ctx)
{
public Promise<Map<Long, Classification>> run(final Context ctx) {
// Network data is shared across classifiers, so we create it here
final Task<Network> network = clientRequestTask(new GetNetworkRequest(_viewerId));

@@ -86,12 +79,14 @@ public Promise<Map<Long, Classification>> run(final Context ctx)

// Full visibility classification
final Task<?> directlyConnectedClassifier = connectedClassifyTask(network);
final Task<?> invitedToGroupClassifier = truthMapQueryClassifyTask("GroupInvited", 1, Classification.FULL_VISIBILITY);
final Task<?> invitedToGroupClassifier =
truthMapQueryClassifyTask("GroupInvited", 1, Classification.FULL_VISIBILITY);
final Task<?> messagedClassifier = truthMapQueryClassifyTask("Messaged", 2, Classification.FULL_VISIBILITY);

// Partial visibility classification
final Task<?> inNetworkClassifier = networkClassifyTask(network);
final Task<?> sharesGroupClassifier = truthMapQueryClassifyTask("CommonGroups", 4, Classification.PARTIAL_VISIBILITY);
final Task<?> sharesGroupClassifier =
truthMapQueryClassifyTask("CommonGroups", 4, Classification.PARTIAL_VISIBILITY);

// Default visibility (i.e. no visibility)
final Task<?> defaultClassifier = classifyTask(DefaultClassifier.instance());
@@ -100,35 +95,25 @@ public Promise<Map<Long, Classification>> run(final Context ctx)
ctx.createTimer(1, TimeUnit.SECONDS, defaultClassifier);

// ORDERING
final Task<?> ordering =
seq(selfClassifier,
par(seq(network, directlyConnectedClassifier),
invitedToGroupClassifier,
messagedClassifier),
par(inNetworkClassifier,
sharesGroupClassifier),
defaultClassifier);
final Task<?> ordering = seq(selfClassifier,
par(seq(network, directlyConnectedClassifier), invitedToGroupClassifier, messagedClassifier),
par(inNetworkClassifier, sharesGroupClassifier), defaultClassifier);
ctx.run(ordering);

return _result;
}

private Task<?> classifyTask(final Classifier classifier)
{
return action(classifier.getClass().getSimpleName(), new Runnable()
{
private Task<?> classifyTask(final Classifier classifier) {
return action(classifier.getClass().getSimpleName(), new Runnable() {
@Override
public void run()
{
public void run() {
doClassify(classifier);
}
});
}

private Task<?> truthMapQueryClassifyTask(final String name,
final int remainder,
final Classification classification)
{
private Task<?> truthMapQueryClassifyTask(final String name, final int remainder,
final Classification classification) {
final Task<Map<Long, Boolean>> svcCall =
clientRequestTask(new TruthMapRequest("get" + name, remainder, _unclassified));

@@ -137,71 +122,54 @@ private Task<?> truthMapQueryClassifyTask(final String name,
return seq(svcCall, classifyResult);
}

private Task<?> truthMapClassifyTask(final String name,
final Classification classification,
final Promise<Map<Long, Boolean>> result)
{
return action(name + "Classifier", new Runnable()
{
private Task<?> truthMapClassifyTask(final String name, final Classification classification,
final Promise<Map<Long, Boolean>> result) {
return action(name + "Classifier", new Runnable() {
@Override
public void run()
{
public void run() {
doClassify(new TruthMapClassifier(classification, result.get()));
}
});
}

private <T> Task<T> clientRequestTask(final Request<T> request)
{
return new BaseTask<T>(request.getName())
{
private <T> Task<T> clientRequestTask(final Request<T> request) {
return new BaseTask<T>(request.getName()) {
@Override
protected Promise<? extends T> run(final Context context) throws Exception
{
protected Promise<? extends T> run(final Context context) throws Exception {
return _client.sendRequest(request);
}
};
}

private Task<?> connectedClassifyTask(final Task<Network> network)
{
return action("ConnectedClassifier", new Runnable()
{
private Task<?> connectedClassifyTask(final Task<Network> network) {
return action("ConnectedClassifier", new Runnable() {
@Override
public void run()
{
public void run() {
doClassify(new ConnectedClassifier(network.get()));
}
});
}

private Task<?> networkClassifyTask(final Task<Network> network)
{
return action("NetworkClassifier", new Runnable()
{
private Task<?> networkClassifyTask(final Task<Network> network) {
return action("NetworkClassifier", new Runnable() {
@Override
public void run()
{
public void run() {
doClassify(new NetworkClassifier(network.get()));
}
});
}

private void doClassify(final Classifier classifier)
{
for (Iterator<Long> it = _unclassified.iterator(); it.hasNext(); )
{
private void doClassify(final Classifier classifier) {
for (Iterator<Long> it = _unclassified.iterator(); it.hasNext();) {
final long vieweeId = it.next();
final Classification classification = classifier.classify(vieweeId);
if (classification != null)
{
if (classification != null) {
it.remove();
_classified.put(vieweeId, classification);
}
}

if (_unclassified.isEmpty())
{
if (_unclassified.isEmpty()) {
_result.done(_classified);
}
}
@@ -19,20 +19,16 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ConnectedClassifier implements Classifier
{
public class ConnectedClassifier implements Classifier {
private final Network _subnet;

public ConnectedClassifier(final Network subnet)
{
public ConnectedClassifier(final Network subnet) {
_subnet = subnet;
}

@Override
public Classification classify(final long vieweeId)
{
return _subnet != null && Network.Distance.D1.equals(_subnet.getDistance(vieweeId))
? Classification.FULL_VISIBILITY
public Classification classify(final long vieweeId) {
return _subnet != null && Network.Distance.D1.equals(_subnet.getDistance(vieweeId)) ? Classification.FULL_VISIBILITY
: null;
}
}
@@ -19,18 +19,15 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class DefaultClassifier implements Classifier
{
public class DefaultClassifier implements Classifier {
private static DefaultClassifier INSTANCE = new DefaultClassifier();

public static DefaultClassifier instance()
{
public static DefaultClassifier instance() {
return INSTANCE;
}

@Override
public Classification classify(final long vieweeId)
{
public Classification classify(final long vieweeId) {
return Classification.NO_VISIBILITY;
}
}
@@ -19,10 +19,8 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class Network
{
public static enum Distance
{
public class Network {
public static enum Distance {
NOT_IN_NETWORK,
SELF,
D1,
@@ -32,19 +30,16 @@

private final long memberId;

public Network(long memberId)
{
public Network(long memberId) {
this.memberId = memberId;
}

public Distance getDistance(final long memberId)
{
public Distance getDistance(final long memberId) {
if (memberId == this.memberId)
return Distance.SELF;

final long group = memberId % 10;
switch ((int) group)
{
switch ((int) group) {
case 10:
return Distance.D3;
case 9:
@@ -19,22 +19,18 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class NetworkClassifier implements Classifier
{
public class NetworkClassifier implements Classifier {
private final Network _subnet;

public NetworkClassifier(final Network subnet)
{
public NetworkClassifier(final Network subnet) {
_subnet = subnet;
}

@Override
public Classification classify(final long vieweeId)
{
public Classification classify(final long vieweeId) {
final Network.Distance distance = _subnet.getDistance(vieweeId);

return Network.Distance.D2.equals(distance) || Network.Distance.D3.equals(distance)
? Classification.PARTIAL_VISIBILITY
: null;
? Classification.PARTIAL_VISIBILITY : null;
}
}
@@ -19,20 +19,15 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class SelfClassifier implements Classifier
{
public class SelfClassifier implements Classifier {
private final long viewerId;

public SelfClassifier(final long viewerId)
{
public SelfClassifier(final long viewerId) {
this.viewerId = viewerId;
}

@Override
public Classification classify(final long vieweeId)
{
return vieweeId == viewerId
? Classification.FULL_VISIBILITY
: null;
public Classification classify(final long vieweeId) {
return vieweeId == viewerId ? Classification.FULL_VISIBILITY : null;
}
}
@@ -18,26 +18,22 @@

import java.util.Map;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TruthMapClassifier implements Classifier
{
public class TruthMapClassifier implements Classifier {
private final Classification _classification;
private final Map<Long, Boolean> _truthMap;

public TruthMapClassifier(Classification classification, Map<Long, Boolean> truthMap)
{
public TruthMapClassifier(Classification classification, Map<Long, Boolean> truthMap) {
_classification = classification;
_truthMap = truthMap;
}

@Override
public Classification classify(final long vieweeId)
{
public Classification classify(final long vieweeId) {
final Boolean b = _truthMap.get(vieweeId);
return b != null && b
? _classification
: null;
return b != null && b ? _classification : null;
}
}
@@ -18,10 +18,10 @@

import com.linkedin.parseq.promise.Promise;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public interface Client
{
public interface Client {
<T> Promise<T> sendRequest(final Request<T> request);
}
@@ -19,8 +19,7 @@
/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public interface Request<T>
{
public interface Request<T> {
T getResponse();

int getLatencyMean();
@@ -18,23 +18,21 @@

import com.linkedin.parseq.example.legacy.composite.classifier.client.Request;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public abstract class AbstractRequest<T> implements Request<T>
{
public abstract class AbstractRequest<T> implements Request<T> {
private static final int DEFAULT_LATENCY_MEAN = 100;
private static final int DEFAULT_LATENCY_STDDEV = 50;

@Override
public int getLatencyMean()
{
public int getLatencyMean() {
return DEFAULT_LATENCY_MEAN;
}

@Override
public int getLatencyStdDev()
{
public int getLatencyStdDev() {
return DEFAULT_LATENCY_STDDEV;
}
}
@@ -26,39 +26,32 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ClientImpl implements Client
{
public class ClientImpl implements Client {
private static final int LATENCY_MIN = 10;

private final ScheduledExecutorService scheduler;
private final Random random = new Random();

public ClientImpl(final ScheduledExecutorService scheduler)
{
public ClientImpl(final ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}

@Override
public <T> Promise<T> sendRequest(final Request<T> request)
{
public <T> Promise<T> sendRequest(final Request<T> request) {
final SettablePromise<T> promise = Promises.settable();
final int mean = request.getLatencyMean();
final int stddev = request.getLatencyStdDev();
final int latency = Math.max(LATENCY_MIN, (int)(random.nextGaussian() * stddev + mean));
scheduler.schedule(new Runnable()
{
final int latency = Math.max(LATENCY_MIN, (int) (random.nextGaussian() * stddev + mean));
scheduler.schedule(new Runnable() {
@Override
public void run()
{
try
{
public void run() {
try {
promise.done(request.getResponse());
}
catch (Exception e)
{
} catch (Exception e) {
promise.fail(e);
}
}
@@ -18,27 +18,24 @@

import com.linkedin.parseq.example.legacy.composite.classifier.Network;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class GetNetworkRequest extends AbstractRequest<Network>
{
public class GetNetworkRequest extends AbstractRequest<Network> {
private final long _memberId;

public GetNetworkRequest(final long memberId)
{
public GetNetworkRequest(final long memberId) {
_memberId = memberId;
}

@Override
public Network getResponse()
{
public Network getResponse() {
return new Network(_memberId);
}

@Override
public String getName()
{
public String getName() {
return "getNetwork";
}
}
@@ -20,38 +20,32 @@
import java.util.Map;
import java.util.Set;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TruthMapRequest extends AbstractRequest<Map<Long, Boolean>>
{
public class TruthMapRequest extends AbstractRequest<Map<Long, Boolean>> {
private final String _name;
private final int _remainder;
private final Set<Long> _memberIds;

public TruthMapRequest(final String name,
final int remainder,
final Set<Long> memberIds)
{
public TruthMapRequest(final String name, final int remainder, final Set<Long> memberIds) {
_name = name;
_remainder = remainder;
_memberIds = memberIds;
}

@Override
public Map<Long, Boolean> getResponse()
{
public Map<Long, Boolean> getResponse() {
final Map<Long, Boolean> result = new HashMap<Long, Boolean>();
for (Long memberId : _memberIds)
{
for (Long memberId : _memberIds) {
result.put(memberId, memberId % 10 == _remainder);
}
return result;
}

@Override
public String getName()
{
public String getName() {
return _name;
}
}
@@ -18,30 +18,25 @@
import static com.linkedin.parseq.example.common.ExampleUtil.callService;
import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class BranchExecutedExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class BranchExecutedExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new BranchExecutedExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> serviceX = getService();

final Task<Integer> fetchX = fetchX(serviceX, 24);
final Task<Integer> enlargeX = new BaseTask<Integer>("make x >= 42")
{
final Task<Integer> enlargeX = new BaseTask<Integer>("make x >= 42") {
@Override
protected Promise<? extends Integer> run(final Context context) throws Exception
{
protected Promise<? extends Integer> run(final Context context) throws Exception {
final int x = fetchX.get();
if (x < 42)
{
if (x < 42) {
final int toAdd = 42 - x;
final Task<Integer> addTo42 = add(x, toAdd);
context.run(addTo42);
@@ -61,21 +56,16 @@ protected Promise<? extends Integer> run(final Context context) throws Exception
printTracingResults(bigX);
}

private static Task<Integer> add(final int x, final int toAdd)
{
return callable("add " + toAdd, new Callable<Integer>()
{
private static Task<Integer> add(final int x, final int toAdd) {
return callable("add " + toAdd, new Callable<Integer>() {
@Override
public Integer call()
{
public Integer call() {
return x + toAdd;
}
});
}

private Task<Integer> fetchX(final MockService<Integer> serviceX,
final int x)
{
private Task<Integer> fetchX(final MockService<Integer> serviceX, final int x) {
return callService("fetch x (x := " + x + ")", serviceX, new SimpleMockRequest<Integer>(10, x));
}
}
@@ -18,29 +18,25 @@
import static com.linkedin.parseq.example.common.ExampleUtil.callService;
import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class BranchSkippedExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class BranchSkippedExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new BranchSkippedExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> serviceX = getService();

final Task<Integer> fetchX = fetchX(serviceX, 420);
final Task<Integer> enlargeX = new BaseTask<Integer>("make x >= 42") {
@Override
protected Promise<Integer> run(final Context context) throws Exception
{
protected Promise<Integer> run(final Context context) throws Exception {
final int x = fetchX.get();
if (x < 42)
{
if (x < 42) {
final int toAdd = 42 - x;
final Task<Integer> addTo42 = add(x, toAdd);
context.run(addTo42);
@@ -60,21 +56,16 @@ protected Promise<Integer> run(final Context context) throws Exception
printTracingResults(bigX);
}

private static Task<Integer> add(final int x, final int toAdd)
{
return callable("add " + toAdd, new Callable<Integer>()
{
private static Task<Integer> add(final int x, final int toAdd) {
return callable("add " + toAdd, new Callable<Integer>() {
@Override
public Integer call() throws Exception
{
public Integer call() throws Exception {
return x + toAdd;
}
});
}

private Task<Integer> fetchX(final MockService<Integer> serviceX,
final int x)
{
private Task<Integer> fetchX(final MockService<Integer> serviceX, final int x) {
return callService("fetch x (x := " + x + ")", serviceX, new SimpleMockRequest<Integer>(10, x));
}
}
@@ -13,27 +13,23 @@
import static com.linkedin.parseq.Tasks.callable;
import static com.linkedin.parseq.example.common.ExampleUtil.fetch404Url;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ErrorPropagationExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class ErrorPropagationExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new ErrorPropagationExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<String> fetch = fetch404Url(httpClient, "http://www.google.com");
final Task<Integer> length = callable("length", new Callable<Integer>()
{
final Task<Integer> length = callable("length", new Callable<Integer>() {
@Override
public Integer call()
{
public Integer call() {
return fetch.get().length();
}
});
@@ -13,27 +13,23 @@
import static com.linkedin.parseq.Tasks.callable;
import static com.linkedin.parseq.example.common.ExampleUtil.fetch404Url;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class ErrorRecoveryExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class ErrorRecoveryExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new ErrorRecoveryExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<String> fetch = fetch404Url(httpClient, "http://www.google.com");
final Task<Integer> length = callable("length", new Callable<Integer>()
{
final Task<Integer> length = callable("length", new Callable<Integer>() {
@Override
public Integer call()
{
public Integer call() {
return fetch.getOrDefault("").length();
}
});
@@ -10,30 +10,26 @@
import static com.linkedin.parseq.example.common.ExampleUtil.fetchUrl;
import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class FanInExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class FanInExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new FanInExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<String> fetchBing = fetchUrl(httpClient, "http://www.bing.com");
final Task<String> fetchYahoo = fetchUrl(httpClient, "http://www.yahoo.com");
final Task<String> fetchGoogle = fetchUrl(httpClient, "http://www.google.com");

final Task<?> printResults = action("printResults", new Runnable()
{
final Task<?> printResults = action("printResults", new Runnable() {
@Override
public void run()
{
public void run() {
System.out.println("Bing => " + fetchBing.get());
System.out.println("Yahoo => " + fetchYahoo.get());
System.out.println("Google => " + fetchGoogle.get());
@@ -10,19 +10,17 @@
import static com.linkedin.parseq.Tasks.par;
import static com.linkedin.parseq.example.common.ExampleUtil.fetchUrl;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class FanOutExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class FanOutExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new FanOutExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();
final Task<String> fetchBing = fetchUrl(httpClient, "http://www.bing.com");
final Task<String> fetchYahoo = fetchUrl(httpClient, "http://www.yahoo.com");
@@ -12,32 +12,28 @@
import static com.linkedin.parseq.Tasks.timeoutWithError;
import static com.linkedin.parseq.example.common.ExampleUtil.fetchUrl;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
*/
public class TimeoutWithErrorExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class TimeoutWithErrorExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new TimeoutWithErrorExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<String> fetch = fetchUrl(httpClient, "http://www.google.com");
final Task<String> fetchWithTimeout =
timeoutWithError(50, TimeUnit.MILLISECONDS, fetch);
final Task<String> fetchWithTimeout = timeoutWithError(50, TimeUnit.MILLISECONDS, fetch);

engine.run(fetchWithTimeout);

fetchWithTimeout.await();

System.out.println(!fetchWithTimeout.isFailed()
? "Received result: " + fetchWithTimeout.get()
: "Error: " + fetchWithTimeout.getError());
System.out.println(!fetchWithTimeout.isFailed() ? "Received result: " + fetchWithTimeout.get()
: "Error: " + fetchWithTimeout.getError());

ExampleUtil.printTracingResults(fetchWithTimeout);
}
@@ -10,20 +10,18 @@
import com.linkedin.parseq.example.common.MockService;
import com.linkedin.parseq.example.common.SimpleMockRequest;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class BranchExecutedExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class BranchExecutedExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new BranchExecutedExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> serviceX = getService();
final Task<Integer> fetchX = fetchX(serviceX, 24);

@@ -34,7 +32,7 @@ protected void doRunExample(final Engine engine) throws Exception
} else {
return fetchX;
}
});
} );

engine.run(bigX);

@@ -45,14 +43,11 @@ protected void doRunExample(final Engine engine) throws Exception
printTracingResults(bigX);
}

private static Task<Integer> add(final int x, final int toAdd)
{
private static Task<Integer> add(final int x, final int toAdd) {
return Task.callable("add " + toAdd, () -> x + toAdd);
}

private Task<Integer> fetchX(final MockService<Integer> serviceX,
final int x)
{
private Task<Integer> fetchX(final MockService<Integer> serviceX, final int x) {
return callService("fetch x (x := " + x + ")", serviceX, new SimpleMockRequest<Integer>(10, x));
}
}
@@ -10,20 +10,18 @@
import com.linkedin.parseq.example.common.MockService;
import com.linkedin.parseq.example.common.SimpleMockRequest;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class BranchSkippedExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class BranchSkippedExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new BranchSkippedExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<Integer> serviceX = getService();
final Task<Integer> fetchX = fetchX(serviceX, 420);

@@ -34,7 +32,7 @@ protected void doRunExample(final Engine engine) throws Exception
} else {
return fetchX;
}
});
} );

engine.run(bigX);

@@ -45,14 +43,11 @@ protected void doRunExample(final Engine engine) throws Exception
printTracingResults(bigX);
}

private static Task<Integer> add(final int x, final int toAdd)
{
private static Task<Integer> add(final int x, final int toAdd) {
return Task.callable("add " + toAdd, () -> x + toAdd);
}

private Task<Integer> fetchX(final MockService<Integer> serviceX,
final int x)
{
private Task<Integer> fetchX(final MockService<Integer> serviceX, final int x) {
return callService("fetch x (x := " + x + ")", serviceX, new SimpleMockRequest<Integer>(10, x));
}
}
@@ -11,37 +11,30 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.example.common.MockService;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class CalcellationExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class CalcellationExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new CalcellationExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();


final Task<Integer> fetchAndLength =
fetchUrl(httpClient, "http://www.google.com", 10000)
.withTimeout(5000, TimeUnit.MILLISECONDS)
.recover("default", t -> "")
.map("length", s -> s.length())
.andThen("big bang", x -> System.exit(1));
fetchUrl(httpClient, "http://www.google.com", 10000).withTimeout(5000, TimeUnit.MILLISECONDS)
.recover("default", t -> "").map("length", s -> s.length()).andThen("big bang", x -> System.exit(1));

engine.run(fetchAndLength);
Thread.sleep(20);
fetchAndLength.cancel(new Exception("because I said so"));

fetchAndLength.await();

System.out.println(!fetchAndLength.isFailed()
? "Received result: " + fetchAndLength.get()
System.out.println(!fetchAndLength.isFailed() ? "Received result: " + fetchAndLength.get()
: "Error: " + fetchAndLength.getError());

ExampleUtil.printTracingResults(fetchAndLength);
@@ -11,27 +11,21 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.example.common.MockService;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class DegradedExperienceExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class DegradedExperienceExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new DegradedExperienceExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();


final Task<Integer> fetchAndLength =
fetchUrl(httpClient, "http://www.google.com", 100)
.withTimeout(50, TimeUnit.MILLISECONDS)
.recover("default", t -> "")
.map("length", s -> s.length());
final Task<Integer> fetchAndLength = fetchUrl(httpClient, "http://www.google.com", 100)
.withTimeout(50, TimeUnit.MILLISECONDS).recover("default", t -> "").map("length", s -> s.length());

engine.run(fetchAndLength);

@@ -9,25 +9,22 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.example.common.MockService;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class ErrorPropagationExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class ErrorPropagationExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new ErrorPropagationExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<Integer> fetchAndLength =
fetch404Url(httpClient, "http://www.google.com")
.map("length", x -> x.length());
fetch404Url(httpClient, "http://www.google.com").map("length", x -> x.length());

engine.run(fetchAndLength);

@@ -9,26 +9,22 @@
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.example.common.MockService;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class ErrorRecoveryExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class ErrorRecoveryExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new ErrorRecoveryExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<Integer> fetchAndLength =
fetch404Url(httpClient, "http://www.google.com")
.recover("default", t -> "")
.map("length", s -> s.length());
fetch404Url(httpClient, "http://www.google.com").recover("default", t -> "").map("length", s -> s.length());

engine.run(fetchAndLength);

@@ -9,32 +9,29 @@
import com.linkedin.parseq.example.common.AbstractExample;
import com.linkedin.parseq.example.common.MockService;


/**
* @author Chris Pettitt (cpettitt@linkedin.com)
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class FanInExample extends AbstractExample
{
public static void main(String[] args) throws Exception
{
public class FanInExample extends AbstractExample {
public static void main(String[] args) throws Exception {
new FanInExample().runExample();
}

@Override
protected void doRunExample(final Engine engine) throws Exception
{
protected void doRunExample(final Engine engine) throws Exception {
final MockService<String> httpClient = getService();

final Task<String> fetchBing = fetchUrl(httpClient, "http://www.bing.com");
final Task<String> fetchYahoo = fetchUrl(httpClient, "http://www.yahoo.com");
final Task<String> fetchGoogle = fetchUrl(httpClient, "http://www.google.com");

final Task<?> fanIn = Task.par(fetchBing, fetchGoogle, fetchYahoo)
.andThen((bing, google, yahoo) -> {
System.out.println("Bing => " + bing);
System.out.println("Yahoo => " + yahoo);
System.out.println("Google => " + google);
});
final Task<?> fanIn = Task.par(fetchBing, fetchGoogle, fetchYahoo).andThen((bing, google, yahoo) -> {
System.out.println("Bing => " + bing);
System.out.println("Yahoo => " + yahoo);
System.out.println("Google => " + google);
} );
engine.run(fanIn);

fanIn.await();