Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Commit

Permalink
Add Examples for async use cases (#197)
Browse files Browse the repository at this point in the history
* Add examples for async as test cases

This includes execution flow similar to:
* Actor ask/tell
* Promises with callbacks
* Work interleaved on a thread using suspend/resume.

The implementations of these execution models are obviously very simplistic, but intended to emphasize the tracing aspect.
  • Loading branch information
tylerbenson authored and carlosalberto committed Nov 9, 2017
1 parent cdc9601 commit 818ae53
Show file tree
Hide file tree
Showing 9 changed files with 627 additions and 8 deletions.
4 changes: 3 additions & 1 deletion opentracing-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ List of patterns:
- **activate_deactivate** - actions are executed by scheduler.
It shows continuation as a solution to finish span when last action is completed.
- **active_span_replacement** - start an isolated task and query for its result in another task/thread
- **actor_propagation** - tracing for blocking and non-blocking actor based tracing
- **client_server** - typical client-server example
- **common_request_handler** - one request handler for all requests
- **late_span_finish** - late parent span finish
- **listener_per_request** - one listener per request
- **multiple_callbacks** - many callbacks spawned at the same time
- **nested_callbacks** - one callback at the time, defined in a pipeline fashion

- **promise_propagation** - tracing patterns for promises with callbacks
- **suspend_resume_propagation** - tracing pattern for interleaving of spans
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.AbstractTag;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -37,20 +38,29 @@ public Integer call() throws Exception {
};
}

public static MockSpan getOneByTag(List<MockSpan> spans, AbstractTag key, Object value) {
MockSpan found = null;
public static List<MockSpan> getByTag(List<MockSpan> spans, AbstractTag key, Object value) {
List<MockSpan> found = new ArrayList<>(spans.size());
for (MockSpan span : spans) {
if (span.tags().get(key.getKey()).equals(value)) {
if (found != null) {
throw new IllegalArgumentException("there is more than one span with tag '"
+ key.getKey() + "' and value '" + value + "'");
}
found = span;
found.add(span);
}
}
return found;
}

public static MockSpan getOneByTag(List<MockSpan> spans, AbstractTag key, Object value) {
List<MockSpan> found = getByTag(spans, key, value);
if (found.size() > 1) {
throw new IllegalArgumentException("there is more than one span with tag '"
+ key.getKey() + "' and value '" + value + "'");
}
if (found.isEmpty()) {
return null;
} else {
return found.get(0);
}
}

public static void sleep() {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(2000));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2016-2017 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.examples.actor_propagation;

import io.opentracing.References;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;

/** @author tylerbenson */
public class Actor implements AutoCloseable {
private final ExecutorService executor;
private final MockTracer tracer;
private final Phaser phaser;

public Actor(MockTracer tracer, Phaser phaser) {
// Passed along here for testing. Normally should be referenced via GlobalTracer.get().
this.tracer = tracer;

this.phaser = phaser;
executor = Executors.newFixedThreadPool(2);
}

@Override
public void close() {
executor.shutdown();
}

public void tell(final String message) {
final Span parent = tracer.scopeManager().active().span();
phaser.register();
executor.submit(
new Runnable() {
@Override
public void run() {
try (Scope child =
tracer
.buildSpan("received")
.addReference(References.FOLLOWS_FROM, parent.context())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.startActive()) {
phaser.arriveAndAwaitAdvance(); // child tracer started
child.span().log("received " + message);
phaser.arriveAndAwaitAdvance(); // assert size
}
phaser.arriveAndAwaitAdvance(); // child tracer finished
phaser.arriveAndAwaitAdvance(); // assert size
}
});
}

public Future<String> ask(final String message) {
final Span parent = tracer.scopeManager().active().span();
phaser.register();
Future<String> future =
executor.submit(
new Callable<String>() {
@Override
public String call() throws Exception {
try (Scope child =
tracer
.buildSpan("received")
.addReference(References.FOLLOWS_FROM, parent.context())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.startActive()) {
phaser.arriveAndAwaitAdvance(); // child tracer started
phaser.arriveAndAwaitAdvance(); // assert size
return "received " + message;
} finally {
phaser.arriveAndAwaitAdvance(); // child tracer finished
phaser.arriveAndAwaitAdvance(); // assert size
}
}
});
return future;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2016-2017 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.examples.actor_propagation;

import io.opentracing.Scope;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.mock.MockTracer.Propagator;
import io.opentracing.tag.Tags;
import io.opentracing.util.ThreadLocalScopeManager;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;

import static io.opentracing.examples.TestUtils.getByTag;
import static io.opentracing.examples.TestUtils.getOneByTag;
import static org.assertj.core.api.Assertions.assertThat;

/**
* These tests are intended to simulate the kind of async models that are common in java async
* frameworks.
*
* For improved readability, ignore the phaser lines as those are there to ensure deterministic
* execution for the tests without sleeps.
*
* @author tylerbenson
*/
public class ActorPropagationTest {

private final MockTracer tracer =
new MockTracer(new ThreadLocalScopeManager(), Propagator.TEXT_MAP);
private Phaser phaser;

@Before
public void before() {
phaser = new Phaser();
}

@Test
public void testActorTell() {
try (Actor actor = new Actor(tracer, phaser)) {
phaser.register();
try (Scope parent =
tracer
.buildSpan("actorTell")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER)
.withTag(Tags.COMPONENT.getKey(), "example-actor")
.startActive()) {
actor.tell("my message 1");
actor.tell("my message 2");
}
phaser.arriveAndAwaitAdvance(); // child tracer started
assertThat(tracer.finishedSpans().size()).isEqualTo(1); // Parent should be reported
phaser.arriveAndAwaitAdvance(); // continue...

phaser.arriveAndAwaitAdvance(); // child tracer finished
assertThat(tracer.finishedSpans().size()).isEqualTo(3);
assertThat(getByTag(tracer.finishedSpans(), Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER))
.hasSize(2);
phaser.arriveAndDeregister(); // continue...

List<MockSpan> finished = tracer.finishedSpans();

assertThat(finished.size()).isEqualTo(3);
assertThat(finished.get(0).context().traceId())
.isEqualTo(finished.get(1).context().traceId());
assertThat(getByTag(finished, Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER)).hasSize(2);
assertThat(getOneByTag(finished, Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER)).isNotNull();
assertThat(tracer.scopeManager().active()).isNull();
}
}

@Test
public void testActorAsk() throws ExecutionException, InterruptedException {
try (Actor actor = new Actor(tracer, phaser)) {
phaser.register();
Future<String> future1;
Future<String> future2;
try (Scope parent =
tracer
.buildSpan("actorAsk")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER)
.withTag(Tags.COMPONENT.getKey(), "example-actor")
.startActive()) {
future1 = actor.ask("my message 1");
future2 = actor.ask("my message 2");
}
phaser.arriveAndAwaitAdvance(); // child tracer started
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
phaser.arriveAndAwaitAdvance(); // continue...

phaser.arriveAndAwaitAdvance(); // child tracer finished
assertThat(tracer.finishedSpans().size()).isEqualTo(3);
assertThat(getByTag(tracer.finishedSpans(), Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER))
.hasSize(2);
phaser.arriveAndDeregister(); // continue...

List<MockSpan> finished = tracer.finishedSpans();

String message1 = future1.get(); // This really should be a non-blocking callback...
String message2 = future2.get(); // This really should be a non-blocking callback...
assertThat(message1).isEqualTo("received my message 1");
assertThat(message2).isEqualTo("received my message 2");
assertThat(finished.size()).isEqualTo(3);
assertThat(finished.get(0).context().traceId())
.isEqualTo(finished.get(1).context().traceId());
assertThat(getByTag(finished, Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER)).hasSize(2);
assertThat(getOneByTag(finished, Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER)).isNotNull();
assertThat(tracer.scopeManager().active()).isNull();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2016-2017 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.examples.promise_propagation;

import io.opentracing.References;
import io.opentracing.Scope;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import java.util.Collection;
import java.util.LinkedList;

/** @author tylerbenson */
public class Promise<T> {
private final PromiseContext context;
private final MockTracer tracer;
private final Scope parentScope;

private final Collection<SuccessCallback<T>> successCallbacks = new LinkedList<>();
private final Collection<ErrorCallback> errorCallbacks = new LinkedList<>();

public Promise(PromiseContext context, MockTracer tracer) {
this.context = context;

// Passed along here for testing. Normally should be referenced via GlobalTracer.get().
this.tracer = tracer;
parentScope = tracer.scopeManager().active();
}

public void onSuccess(SuccessCallback<T> successCallback) {
successCallbacks.add(successCallback);
}

public void onError(ErrorCallback errorCallback) {
errorCallbacks.add(errorCallback);
}

public void success(final T result) {
for (final SuccessCallback<T> callback : successCallbacks) {
context.submit(
new Runnable() {
@Override
public void run() {
try (Scope child =
tracer
.buildSpan("success")
.addReference(References.FOLLOWS_FROM, parentScope.span().context())
.withTag(Tags.COMPONENT.getKey(), "success")
.startActive()) {
callback.accept(result);
}
context.getPhaser().arriveAndAwaitAdvance(); // trace reported
}
});
}
}

public void error(final Throwable error) {
for (final ErrorCallback callback : errorCallbacks) {
context.submit(
new Runnable() {
@Override
public void run() {
try (Scope child =
tracer
.buildSpan("error")
.addReference(References.FOLLOWS_FROM, parentScope.span().context())
.withTag(Tags.COMPONENT.getKey(), "error")
.startActive()) {
callback.accept(error);
}
context.getPhaser().arriveAndAwaitAdvance(); // trace reported
}
});
}
}

public interface SuccessCallback<T> {
/** @param t the result of the promise */
void accept(T t);
}

public interface ErrorCallback {
/** @param t the error result of the promise */
void accept(Throwable t);
}
}
Loading

0 comments on commit 818ae53

Please sign in to comment.