Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation for Elasticsearch 8+ #8799

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("co.elastic.clients")
module.set("elasticsearch-java")
versions.set("[7.16,)")
AlexanderWert marked this conversation as resolved.
Show resolved Hide resolved
assertInverse.set(true)
}
}

dependencies {
library("co.elastic.clients:elasticsearch-java:8.0.0")

implementation(project(":instrumentation:elasticsearch:elasticsearch-rest-common:javaagent"))

testInstrumentation(project(":instrumentation:elasticsearch:elasticsearch-rest-7.0:javaagent"))
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent"))
testInstrumentation(project(":instrumentation:apache-httpasyncclient-4.1:javaagent"))

testImplementation("org.apache.logging.log4j:log4j-core:2.11.0")
laurit marked this conversation as resolved.
Show resolved Hide resolved
testImplementation("org.apache.logging.log4j:log4j-api:2.11.0")
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.14.2")

testImplementation("org.testcontainers:elasticsearch")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.elasticsearch.apiclient.v8_0;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import co.elastic.clients.transport.Endpoint;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchEndpointDefinition;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.elasticsearch.client.Request;

public class ApiClientInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("co.elastic.clients.transport.rest_client.RestClientTransport");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("prepareLowLevelRequest"))
.and(takesArgument(1, named("co.elastic.clients.transport.Endpoint")))
.and(returns(named("org.elasticsearch.client.Request"))),
this.getClass().getName() + "$RestClientTransportAdvice");
}

@SuppressWarnings("unused")
public static class RestClientTransportAdvice {

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onPrepareLowLevelRequest(
@Advice.Argument(1) Endpoint<?, ?, ?> endpoint, @Advice.Return Request request) {
VirtualField<Request, ElasticsearchEndpointDefinition> virtualField =
VirtualField.find(Request.class, ElasticsearchEndpointDefinition.class);
String endpointId = endpoint.id();
if (endpointId.startsWith("es/") && endpointId.length() > 3) {
endpointId = endpointId.substring(3);
}
virtualField.set(
request,
new ElasticsearchEndpointDefinition(
endpointId, ElasticsearchEndpointMap.get().get(endpointId)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.elasticsearch.apiclient.v8_0;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class ElasticsearchApiClientInstrumentationModule extends InstrumentationModule {
public ElasticsearchApiClientInstrumentationModule() {
super("elasticsearch-client", "elasticsearch-api-client", "elasticsearch");
AlexanderWert marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new ApiClientInstrumentation());
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.InfoResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.assertj.core.api.AbstractLongAssert;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchClient8Test {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

static ElasticsearchContainer elasticsearch;

static HttpHost httpHost;

static ElasticsearchClient client;
static ElasticsearchAsyncClient asyncClient;

@BeforeAll
static void setUp() {
elasticsearch =
new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.17.2");
// limit memory usage
elasticsearch.withEnv("ES_JAVA_OPTS", "-Xmx256m -Xms256m");
elasticsearch.start();

httpHost = HttpHost.create(elasticsearch.getHttpHostAddress());

RestClient restClient =
RestClient.builder(httpHost)
.setRequestConfigCallback(
builder ->
builder
.setConnectTimeout(Integer.MAX_VALUE)
.setSocketTimeout(Integer.MAX_VALUE))
.build();

ElasticsearchTransport transport =
new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}

@AfterAll
static void cleanUp() {
elasticsearch.stop();
}

private static String userAgent() {
return "elastic-java/"
+ RestClientBuilder.VERSION
+ " (Java/"
+ System.getProperty("java.version")
+ ")";
}

@Test
// ignore deprecation interface
public void elasticsearchStatus() throws IOException {
InfoResponse response = client.info();
Assertions.assertEquals(response.version().number(), "7.17.2");

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("info")
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "elasticsearch"),
equalTo(SemanticAttributes.DB_OPERATION, "info"),
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, httpHost.toURI() + "/")),
span ->
span.hasName("GET")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, httpHost.getHostName()),
equalTo(SemanticAttributes.NET_PEER_PORT, httpHost.getPort()),
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(AttributeKey.stringKey("net.protocol.name"), "http"),
equalTo(AttributeKey.stringKey("net.protocol.version"), "1.1"),
equalTo(SemanticAttributes.HTTP_URL, httpHost.toURI() + "/"),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200L),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, userAgent()),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isPositive))));
}

@Test
// ignore deprecation interface
public void elasticsearchIndex() throws IOException {
client.index(
r ->
r.id("test-id")
.index("test-index")
.document(new Person("person-name"))
.timeout(t -> t.time("1s")));
AlexanderWert marked this conversation as resolved.
Show resolved Hide resolved

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("index")
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "elasticsearch"),
equalTo(SemanticAttributes.DB_OPERATION, "index"),
equalTo(SemanticAttributes.HTTP_METHOD, "PUT"),
equalTo(
SemanticAttributes.HTTP_URL,
httpHost.toURI() + "/test-index/_doc/test-id?timeout=1s"),
equalTo(
AttributeKey.stringKey("db.elasticsearch.path_parts.index"),
"test-index"),
equalTo(
AttributeKey.stringKey("db.elasticsearch.path_parts.id"),
"test-id")),
span ->
span.hasName("PUT")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, httpHost.getHostName()),
equalTo(SemanticAttributes.NET_PEER_PORT, httpHost.getPort()),
equalTo(SemanticAttributes.HTTP_METHOD, "PUT"),
equalTo(AttributeKey.stringKey("net.protocol.name"), "http"),
equalTo(AttributeKey.stringKey("net.protocol.version"), "1.1"),
equalTo(
SemanticAttributes.HTTP_URL,
httpHost.toURI() + "/test-index/_doc/test-id?timeout=1s"),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 201L),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, userAgent()),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isPositive))));
}

@Test
// ignore deprecation interface
AlexanderWert marked this conversation as resolved.
Show resolved Hide resolved
public void elasticsearchStatusAsync() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
AsyncRequest request = new AsyncRequest();

runWithSpan(
"parent",
() ->
asyncClient
.info()
.thenAccept(
infoResponse ->
runWithSpan(
"callback",
() -> {
countDownLatch.countDown();
request.setResponse(infoResponse);
})));
//noinspection ResultOfMethodCallIgnored
countDownLatch.await(10, TimeUnit.SECONDS);

Assertions.assertEquals(request.getResponse().version().number(), "7.17.2");

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("info")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "elasticsearch"),
equalTo(SemanticAttributes.DB_OPERATION, "info"),
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, httpHost.toURI() + "/")),
span ->
span.hasName("GET")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, httpHost.getHostName()),
equalTo(SemanticAttributes.NET_PEER_PORT, httpHost.getPort()),
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(AttributeKey.stringKey("net.protocol.name"), "http"),
equalTo(AttributeKey.stringKey("net.protocol.version"), "1.1"),
equalTo(SemanticAttributes.HTTP_URL, httpHost.toURI() + "/"),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200L),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, userAgent()),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isPositive)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}

private static class AsyncRequest {
InfoResponse response = null;

public InfoResponse getResponse() {
return response;
}

public AsyncRequest setResponse(InfoResponse response) {
this.response = response;
return this;
}
}

private static class Person {
public final String name;

Person(String name) {
this.name = name;
}

@SuppressWarnings("unused")
public String getName() {
return name;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ void elasticsearchStatus() throws IOException {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "elasticsearch"),
equalTo(SemanticAttributes.DB_OPERATION, "GET"),
equalTo(SemanticAttributes.DB_STATEMENT, "GET _cluster/health"));
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(
SemanticAttributes.HTTP_URL, httpHost.toURI() + "/_cluster/health"));
},
span -> {
span.hasName("GET")
Expand Down Expand Up @@ -170,8 +171,9 @@ public void onFailure(Exception e) {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "elasticsearch"),
equalTo(SemanticAttributes.DB_OPERATION, "GET"),
equalTo(SemanticAttributes.DB_STATEMENT, "GET _cluster/health"));
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(
SemanticAttributes.HTTP_URL, httpHost.toURI() + "/_cluster/health"));
},
span -> {
span.hasName("GET")
Expand Down