From f722c2a8f80f70aa6acf8ac7ca021b83b952b592 Mon Sep 17 00:00:00 2001 From: Sergei Malafeev Date: Wed, 17 Apr 2019 13:03:03 +0800 Subject: [PATCH 1/2] #9 Add Elasticsearch 7 support Signed-off-by: Sergei Malafeev --- opentracing-elasticsearch7-client/pom.xml | 50 +++++ .../TracingPreBuiltTransportClient.java | 93 ++++++++ .../TracingResponseListener.java | 60 +++++ .../contrib/elasticsearch7/TracingTest.java | 210 ++++++++++++++++++ pom.xml | 6 +- 5 files changed, 418 insertions(+), 1 deletion(-) create mode 100644 opentracing-elasticsearch7-client/pom.xml create mode 100644 opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingPreBuiltTransportClient.java create mode 100644 opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingResponseListener.java create mode 100644 opentracing-elasticsearch7-client/src/test/java/io/opentracing/contrib/elasticsearch7/TracingTest.java diff --git a/opentracing-elasticsearch7-client/pom.xml b/opentracing-elasticsearch7-client/pom.xml new file mode 100644 index 0000000..36f383e --- /dev/null +++ b/opentracing-elasticsearch7-client/pom.xml @@ -0,0 +1,50 @@ + + + + + opentracing-elasticsearch-client-parent + io.opentracing.contrib + 0.1.1-SNAPSHOT + + 4.0.0 + + opentracing-elasticsearch7-client + OpenTracing Instrumentation for Elasticsearch 7 Client + OpenTracing Instrumentation for Elasticsearch 7 Client + + + + io.opentracing.contrib + opentracing-elasticsearch-client-common + 0.1.1-SNAPSHOT + + + org.elasticsearch.client + transport + + + + + org.elasticsearch.client + transport + ${elasticsearch7.version} + + + + \ No newline at end of file diff --git a/opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingPreBuiltTransportClient.java b/opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingPreBuiltTransportClient.java new file mode 100644 index 0000000..8d170e2 --- /dev/null +++ b/opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingPreBuiltTransportClient.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.elasticsearch7; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.contrib.elasticsearch.common.SpanDecorator; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Collection; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + + +public class TracingPreBuiltTransportClient extends PreBuiltTransportClient { + + private final Tracer tracer; + + @SafeVarargs + public TracingPreBuiltTransportClient(Tracer tracer, Settings settings, + Class... plugins) { + super(settings, plugins); + this.tracer = tracer; + } + + /** + * GlobalTracer is used to get tracer + */ + @SafeVarargs + public TracingPreBuiltTransportClient(Settings settings, + Class... plugins) { + this(GlobalTracer.get(), settings, plugins); + } + + public TracingPreBuiltTransportClient(Tracer tracer, Settings settings, + Collection> plugins) { + super(settings, plugins); + this.tracer = tracer; + } + + /** + * GlobalTracer is used to get tracer + */ + public TracingPreBuiltTransportClient(Settings settings, + Collection> plugins) { + this(GlobalTracer.get(), settings, plugins); + } + + public TracingPreBuiltTransportClient(Tracer tracer, Settings settings, + Collection> plugins, + HostFailureListener hostFailureListener) { + super(settings, plugins, hostFailureListener); + this.tracer = tracer; + } + + /** + * GlobalTracer is used to get tracer + */ + public TracingPreBuiltTransportClient(Settings settings, + Collection> plugins, + HostFailureListener hostFailureListener) { + this(GlobalTracer.get(), settings, plugins, hostFailureListener); + } + + @Override + protected + void doExecute(Action action, Request request, ActionListener listener) { + Tracer.SpanBuilder spanBuilder = tracer.buildSpan(request.getClass().getSimpleName()) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT); + + Span span = spanBuilder.start(); + SpanDecorator.onRequest(span); + + ActionListener actionFuture = new TracingResponseListener<>(listener, span); + super.doExecute(action, request, actionFuture); + } +} diff --git a/opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingResponseListener.java b/opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingResponseListener.java new file mode 100644 index 0000000..8ca359c --- /dev/null +++ b/opentracing-elasticsearch7-client/src/main/java/io/opentracing/contrib/elasticsearch7/TracingResponseListener.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.elasticsearch7; + +import io.opentracing.Span; +import io.opentracing.contrib.elasticsearch.common.SpanDecorator; +import io.opentracing.tag.Tags; +import java.net.InetSocketAddress; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; + +public class TracingResponseListener implements ActionListener { + + private final ActionListener listener; + private final Span span; + + public TracingResponseListener(ActionListener listener, Span span) { + this.listener = listener; + this.span = span; + } + + @Override + public void onResponse(T t) { + if (t.remoteAddress() != null) { + InetSocketAddress address = t.remoteAddress().address(); + if (address != null) { + Tags.PEER_HOSTNAME.set(span, address.getHostName()); + Tags.PEER_PORT.set(span, address.getPort()); + } + } + + try { + listener.onResponse(t); + } finally { + span.finish(); + } + } + + @Override + public void onFailure(Exception e) { + SpanDecorator.onError(e, span); + + try { + listener.onFailure(e); + } finally { + span.finish(); + } + } +} diff --git a/opentracing-elasticsearch7-client/src/test/java/io/opentracing/contrib/elasticsearch7/TracingTest.java b/opentracing-elasticsearch7-client/src/test/java/io/opentracing/contrib/elasticsearch7/TracingTest.java new file mode 100644 index 0000000..65099b3 --- /dev/null +++ b/opentracing-elasticsearch7-client/src/test/java/io/opentracing/contrib/elasticsearch7/TracingTest.java @@ -0,0 +1,210 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.elasticsearch7; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import io.opentracing.contrib.elasticsearch.common.SpanDecorator; +import io.opentracing.contrib.elasticsearch.common.TracingHttpClientConfigCallback; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.tag.Tags; +import io.opentracing.util.ThreadLocalScopeManager; +import java.net.InetAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TracingTest { + + private static final int HTTP_PORT = 9205; + private static final String HTTP_TRANSPORT_PORT = "9305"; + private static final String ES_WORKING_DIR = "target/es"; + private static String clusterName = "cluster-name"; + private static Node node; + private final MockTracer mockTracer = new MockTracer(new ThreadLocalScopeManager(), + MockTracer.Propagator.TEXT_MAP); + + @BeforeClass + public static void startElasticsearch() throws Exception { + Settings settings = Settings.builder() + .put("path.home", ES_WORKING_DIR) + .put("path.data", ES_WORKING_DIR + "/data") + .put("path.logs", ES_WORKING_DIR + "/logs") + .put("transport.type", "netty4") + .put("http.type", "netty4") + .put("cluster.name", clusterName) + .put("http.port", HTTP_PORT) + .put("transport.tcp.port", HTTP_TRANSPORT_PORT) + .put("network.host", "127.0.0.1") + .build(); + Collection plugins = Collections.singletonList(Netty4Plugin.class); + node = new PluginConfigurableNode(settings, plugins); + node.start(); + } + + @AfterClass + public static void stopElasticsearch() throws Exception { + node.close(); + } + + @Before + public void before() { + mockTracer.reset(); + } + + @Test + public void restClient() throws Exception { + RestClient restClient = RestClient.builder( + new HttpHost("localhost", HTTP_PORT, "http")) + .setHttpClientConfigCallback(new TracingHttpClientConfigCallback(mockTracer)) + .build(); + + HttpEntity entity = new NStringEntity( + "{\n" + + " \"user\" : \"kimchy\",\n" + + " \"post_date\" : \"2009-11-15T14:12:12\",\n" + + " \"message\" : \"trying out Elasticsearch\"\n" + + "}", ContentType.APPLICATION_JSON); + + Request request = new Request("PUT", "/twitter/tweet/1"); + request.setEntity(entity); + + Response indexResponse = restClient.performRequest(request); + + assertNotNull(indexResponse); + + Request request2 = new Request("PUT", "/twitter/tweet/2"); + request2.setEntity(entity); + + final CountDownLatch latch = new CountDownLatch(1); + restClient + .performRequestAsync(request2, new ResponseListener() { + @Override + public void onSuccess(Response response) { + latch.countDown(); + } + + @Override + public void onFailure(Exception exception) { + latch.countDown(); + } + }); + + latch.await(30, TimeUnit.SECONDS); + restClient.close(); + + List finishedSpans = mockTracer.finishedSpans(); + assertEquals(2, finishedSpans.size()); + checkSpans(finishedSpans, "PUT"); + assertNull(mockTracer.activeSpan()); + } + + @Test + public void transportClient() throws Exception { + + Settings settings = Settings.builder() + .put("cluster.name", clusterName).build(); + + TransportClient client = new TracingPreBuiltTransportClient(mockTracer, settings) + .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), + Integer.parseInt(HTTP_TRANSPORT_PORT))); + + IndexRequest indexRequest = new IndexRequest("twitter").type("tweet").id("1"). + source(jsonBuilder() + .startObject() + .field("user", "kimchy") + .field("postDate", new Date()) + .field("message", "trying out Elasticsearch") + .endObject() + ); + + IndexResponse indexResponse = client.index(indexRequest).actionGet(); + assertNotNull(indexResponse); + + final CountDownLatch latch = new CountDownLatch(1); + client.index(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + + latch.await(30, TimeUnit.SECONDS); + client.close(); + + List finishedSpans = mockTracer.finishedSpans(); + assertEquals(2, finishedSpans.size()); + checkSpans(finishedSpans, "IndexRequest"); + assertNull(mockTracer.activeSpan()); + } + + private void checkSpans(List mockSpans, String expectedOperationName) { + for (MockSpan mockSpan : mockSpans) { + assertEquals(Tags.SPAN_KIND_CLIENT, mockSpan.tags().get(Tags.SPAN_KIND.getKey())); + assertEquals(SpanDecorator.COMPONENT_NAME, mockSpan.tags().get(Tags.COMPONENT.getKey())); + assertEquals(0, mockSpan.generatedErrors().size()); + assertEquals(0, mockSpan.parentId()); + String operationName = mockSpan.operationName(); + assertEquals(operationName, expectedOperationName); + } + } + + private static class PluginConfigurableNode extends Node { + + public PluginConfigurableNode(Settings settings, + Collection> classpathPlugins) { + super(InternalSettingsPreparer + .prepareEnvironment(settings, new HashMap<>(), null, () -> "local"), + classpathPlugins, false); + } + + + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index ae55dff..1994c3e 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,9 @@ the License. --> - + 4.0.0 io.opentracing.contrib opentracing-elasticsearch-client-parent @@ -67,12 +69,14 @@ 0.8.2 5.6.16 6.6.2 + 7.0.0 opentracing-elasticsearch5-client opentracing-elasticsearch6-client opentracing-elasticsearch-client-common + opentracing-elasticsearch7-client From 9aaf114d0b2d5cdb3d57bef69d11c02da0a30041 Mon Sep 17 00:00:00 2001 From: Sergei Malafeev Date: Wed, 17 Apr 2019 13:08:35 +0800 Subject: [PATCH 2/2] update README Signed-off-by: Sergei Malafeev --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 3c21599..e5744b0 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,16 @@ pom.xml ``` +#### Elasticsearch 7 + +```xml + + io.opentracing.contrib + opentracing-elasticsearch7-client + VERSION + +``` + ## Usage ```java