Skip to content

Commit

Permalink
Merge 5263e39 into 37634e0
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim86 committed Feb 11, 2021
2 parents 37634e0 + 5263e39 commit 38652b6
Show file tree
Hide file tree
Showing 16 changed files with 873 additions and 148 deletions.
2 changes: 1 addition & 1 deletion .mvn/wrapper/maven-wrapper.properties
@@ -1 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.3/apache-maven-3.5.3-bin.zip
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
8 changes: 8 additions & 0 deletions pom.xml
Expand Up @@ -71,6 +71,7 @@

<neo4j-java-driver.version>4.2.0</neo4j-java-driver.version>
<testcontainers.version>1.15.1</testcontainers.version>
<reactor.version>3.4.2</reactor.version>
<awaitility.version>4.0.0</awaitility.version>
<junit.version>4.13.1</junit.version>
<slf4j.version>1.7.30</slf4j.version>
Expand Down Expand Up @@ -139,6 +140,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
80 changes: 43 additions & 37 deletions src/main/java/io/opentracing/contrib/neo4j/TracingAsyncSession.java
Expand Up @@ -13,25 +13,20 @@
*/
package io.opentracing.contrib.neo4j;

import static io.opentracing.contrib.neo4j.TracingHelper.decorate;
import static io.opentracing.contrib.neo4j.TracingHelper.mapToString;
import static io.opentracing.contrib.neo4j.TracingHelper.onError;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.*;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.AsyncTransactionWork;
import org.neo4j.driver.async.ResultCursor;

import java.util.Map;
import java.util.concurrent.CompletionStage;

import static io.opentracing.contrib.neo4j.TracingHelper.*;

public class TracingAsyncSession implements AsyncSession {

private final AsyncSession session;
Expand All @@ -44,19 +39,20 @@ public TracingAsyncSession(AsyncSession session, Tracer tracer) {

@Override
public CompletionStage<AsyncTransaction> beginTransactionAsync() {
// TODO
return session.beginTransactionAsync();
Span span = TracingHelper.build("transactionAsync", tracer);
CompletionStage<AsyncTransaction> transactionAsync = session.beginTransactionAsync();
return transactionAsync.thenApply(tr -> new TracingAsyncTransaction(tr, span, tracer));
}

@Override
public CompletionStage<AsyncTransaction> beginTransactionAsync(TransactionConfig config) {
// TODO
return session.beginTransactionAsync(config);
Span span = TracingHelper.build("transactionAsync", tracer);
CompletionStage<AsyncTransaction> transactionAsync = session.beginTransactionAsync(config);
return transactionAsync.thenApply(tr -> new TracingAsyncTransaction(tr, span, tracer));
}

@Override
public <T> CompletionStage<T> readTransactionAsync(
AsyncTransactionWork<CompletionStage<T>> work) {
public <T> CompletionStage<T> readTransactionAsync(AsyncTransactionWork<CompletionStage<T>> work) {
Span span = TracingHelper.build("readTransactionAsync", tracer);
try {
return session.readTransactionAsync(new TracingAsyncTransactionWork<>(work, span, tracer))
Expand All @@ -74,8 +70,7 @@ public <T> CompletionStage<T> readTransactionAsync(
}

@Override
public <T> CompletionStage<T> readTransactionAsync(
AsyncTransactionWork<CompletionStage<T>> work, TransactionConfig config) {
public <T> CompletionStage<T> readTransactionAsync(AsyncTransactionWork<CompletionStage<T>> work, TransactionConfig config) {
Span span = TracingHelper.build("readTransactionAsync", tracer);
span.setTag("config", config.toString());
try {
Expand Down Expand Up @@ -114,8 +109,7 @@ public <T> CompletionStage<T> writeTransactionAsync(
}

@Override
public <T> CompletionStage<T> writeTransactionAsync(
AsyncTransactionWork<CompletionStage<T>> work, TransactionConfig config) {
public <T> CompletionStage<T> writeTransactionAsync(AsyncTransactionWork<CompletionStage<T>> work, TransactionConfig config) {
Span span = TracingHelper.build("writeTransactionAsync", tracer);
span.setTag("config", config.toString());
try {
Expand All @@ -135,28 +129,32 @@ public <T> CompletionStage<T> writeTransactionAsync(
}

@Override
public CompletionStage<ResultCursor> runAsync(
String query, TransactionConfig config) {
public CompletionStage<ResultCursor> runAsync(String query, TransactionConfig config) {
Span span = TracingHelper.build("runAsync", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
span.setTag("config", config.toString());
return decorate(session.runAsync(query, config), span);
}

@Override
public CompletionStage<ResultCursor> runAsync(
String query, Map<String, Object> parameters, TransactionConfig config) {
public CompletionStage<ResultCursor> runAsync(String query, Map<String, Object> parameters, TransactionConfig config) {
Span span = TracingHelper.build("runAsync", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
span.setTag("parameters", mapToString(parameters));
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
span.setTag("config", config.toString());
return decorate(session.runAsync(query, parameters, config), span);
}

@Override
public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config) {
Span span = TracingHelper.build("runAsync", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query.toString());
span.setTag(Tags.DB_STATEMENT.getKey(), query.text());
Map<String, Object> parameters = query.parameters().asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
span.setTag("config", config.toString());
return decorate(session.runAsync(query, config), span);
}
Expand All @@ -172,32 +170,36 @@ public CompletionStage<Void> closeAsync() {
}

@Override
public CompletionStage<ResultCursor> runAsync(String query, Value parameters) {
public CompletionStage<ResultCursor> runAsync(String query, Value parametersValue) {
Span span = TracingHelper.build("run", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
if (parameters != null) {
span.setTag("parameters", parameters.toString());
Map<String, Object> parameters = parametersValue.asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(session.runAsync(query, parameters), span);
return decorate(session.runAsync(query, parametersValue), span);
}

@Override
public CompletionStage<ResultCursor> runAsync(
String query, Map<String, Object> parameters) {
Span span = TracingHelper.build("run", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
span.setTag("parameters", mapToString(parameters));
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(session.runAsync(query, parameters), span);
}

@Override
public CompletionStage<ResultCursor> runAsync(String query, Record parameters) {
public CompletionStage<ResultCursor> runAsync(String query, Record parametersRecord) {
Span span = TracingHelper.build("runAsync", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
if (parameters != null) {
span.setTag("parameters", mapToString(parameters.asMap()));
Map<String, Object> parameters = parametersRecord.asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(session.runAsync(query, parameters), span);
return decorate(session.runAsync(query, parametersRecord), span);
}

@Override
Expand All @@ -210,7 +212,11 @@ public CompletionStage<ResultCursor> runAsync(String query) {
@Override
public CompletionStage<ResultCursor> runAsync(Query query) {
Span span = TracingHelper.build("run", tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query.toString());
span.setTag(Tags.DB_STATEMENT.getKey(), query.text());
Map<String, Object> parameters = query.parameters().asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(session.runAsync(query), span);
}
}
Expand Up @@ -13,10 +13,6 @@
*/
package io.opentracing.contrib.neo4j;

import static io.opentracing.contrib.neo4j.TracingHelper.decorate;
import static io.opentracing.contrib.neo4j.TracingHelper.mapToString;
import static io.opentracing.contrib.neo4j.TracingHelper.onError;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
Expand All @@ -28,26 +24,18 @@
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.ResultCursor;

import static io.opentracing.contrib.neo4j.TracingHelper.*;

public class TracingAsyncTransaction implements AsyncTransaction {

private final AsyncTransaction transaction;
private final Span parent;
private final Tracer tracer;
private final boolean finishSpan;

public TracingAsyncTransaction(
AsyncTransaction transaction, Span parent,
Tracer tracer) {
this(transaction, parent, tracer, false);
}

public TracingAsyncTransaction(
AsyncTransaction transaction, Span parent, Tracer tracer,
boolean finishSpan) {
public TracingAsyncTransaction(AsyncTransaction transaction, Span parent, Tracer tracer) {
this.transaction = transaction;
this.tracer = tracer;
this.parent = parent;
this.finishSpan = finishSpan;
}

@Override
Expand All @@ -71,33 +59,35 @@ public CompletionStage<Void> rollbackAsync() {
}

@Override
public CompletionStage<ResultCursor> runAsync(String query, Value parameters) {
public CompletionStage<ResultCursor> runAsync(String query, Value parametersValue) {
Span span = TracingHelper.build("runAsync", parent, tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
span.setTag("parameters", parameters.toString());
return decorate(transaction.runAsync(query, parameters), span);
Map<String, Object> parameters = parametersValue.asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(transaction.runAsync(query, parametersValue), span);
}

@Override
public CompletionStage<ResultCursor> runAsync(
String query, Map<String, Object> parameters) {
public CompletionStage<ResultCursor> runAsync(String query, Map<String, Object> parameters) {
Span span = TracingHelper.build("runAsync", parent, tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
if (parameters != null) {
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(transaction.runAsync(query, parameters), span);
}

@Override
public CompletionStage<ResultCursor> runAsync(String query, Record parameters) {
public CompletionStage<ResultCursor> runAsync(String query, Record parametersRecord) {
Span span = TracingHelper.build("runAsync", parent, tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query);
if (parameters != null) {
span.setTag("parameters", TracingHelper.mapToString(parameters.asMap()));
Map<String, Object> parameters = parametersRecord.asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}

return decorate(transaction.runAsync(query, parameters), span);
return decorate(transaction.runAsync(query, parametersRecord), span);
}

@Override
Expand All @@ -110,7 +100,11 @@ public CompletionStage<ResultCursor> runAsync(String query) {
@Override
public CompletionStage<ResultCursor> runAsync(Query query) {
Span span = TracingHelper.build("runAsync", parent, tracer);
span.setTag(Tags.DB_STATEMENT.getKey(), query.toString());
span.setTag(Tags.DB_STATEMENT.getKey(), query.text());
Map<String, Object> parameters = query.parameters().asMap();
if (isNotEmpty(parameters)) {
span.setTag("parameters", mapToString(parameters));
}
return decorate(transaction.runAsync(query), span);
}
}
9 changes: 4 additions & 5 deletions src/main/java/io/opentracing/contrib/neo4j/TracingDriver.java
Expand Up @@ -14,7 +14,6 @@
package io.opentracing.contrib.neo4j;

import io.opentracing.Tracer;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Metrics;
import org.neo4j.driver.Session;
Expand All @@ -23,6 +22,8 @@
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.types.TypeSystem;

import java.util.concurrent.CompletionStage;

public class TracingDriver implements Driver {

private final Driver driver;
Expand Down Expand Up @@ -50,14 +51,12 @@ public Session session(SessionConfig config) {

@Override
public RxSession rxSession() {
// TODO Missing Tracing Implementation
return driver.rxSession();
return new TracingRxSession(driver.rxSession(), tracer);
}

@Override
public RxSession rxSession(SessionConfig sessionConfig) {
// TODO Missing Tracing Implementation
return driver.rxSession(sessionConfig);
return new TracingRxSession(driver.rxSession(sessionConfig), tracer);
}

@Override
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/io/opentracing/contrib/neo4j/TracingHelper.java
Expand Up @@ -18,12 +18,14 @@
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.tag.Tags;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.reactive.RxResult;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.neo4j.driver.async.ResultCursor;

class TracingHelper {

Expand All @@ -36,9 +38,9 @@ static Span build(String operationName, Tracer tracer) {

static Span build(String operationName, Span parent, Tracer tracer) {
SpanBuilder builder = tracer.buildSpan(operationName)
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.DB_TYPE.getKey(), DB_TYPE);
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.DB_TYPE.getKey(), DB_TYPE);
if (parent != null) {
builder.asChildOf(parent);
}
Expand All @@ -56,19 +58,21 @@ static void onError(Throwable throwable, Span span) {
}
}

static boolean isNotEmpty(Map<?, ?> map) {
return map != null && !map.isEmpty();
}

static String mapToString(Map<String, Object> map) {
if (map == null) {
return "";
}
return map.entrySet()
.stream()
.map(entry -> entry.getKey() + " -> " + entry.getValue())
.collect(Collectors.joining(", "));
.stream()
.map(entry -> entry.getKey() + " -> " + entry.getValue())
.collect(Collectors.joining(", "));
}

static CompletionStage<ResultCursor> decorate(
CompletionStage<ResultCursor> stage,
Span span) {
static CompletionStage<ResultCursor> decorate(CompletionStage<ResultCursor> stage, Span span) {
return stage.whenComplete((statementResultCursor, throwable) -> {
if (throwable != null) {
onError(throwable, span);
Expand Down

0 comments on commit 38652b6

Please sign in to comment.