Skip to content

Commit

Permalink
Add instrumentation in InboundHandler
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <gjjuneja@amazon.com>
  • Loading branch information
Gagan Juneja committed Sep 20, 2023
1 parent 2f7969a commit 777421c
Show file tree
Hide file tree
Showing 35 changed files with 328 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_TRANSPORT_NAME,
Expand All @@ -108,7 +109,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Netty4NioSocketChannel;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -131,9 +132,10 @@ public Netty4Transport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void startThreadPool() {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
transport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
PageCacheRecycler.NON_RECYCLING_INSTANCE,
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() {
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
writableRegistry(),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
) {
@Override
public TransportAddress[] addressesFromString(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() {
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.nio.NioSelector;
import org.opensearch.nio.NioSocketChannel;
import org.opensearch.nio.ServerChannelContext;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpTransport;
import org.opensearch.transport.TransportSettings;
Expand Down Expand Up @@ -84,9 +85,10 @@ protected NioTransport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
NioGroupFactory groupFactory
NioGroupFactory groupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
this.groupFactory = groupFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NIO_TRANSPORT_NAME,
Expand All @@ -103,7 +104,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getNioGroupFactory(settings)
getNioGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -81,7 +82,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
new MockPageCacheRecycler(settings),
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new NioGroupFactory(settings, logger)
new NioGroupFactory(settings, logger),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public NetworkModule(
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
networkService
networkService,
tracer
);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ default Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ private AttributeNames() {
*/
public static final String TRANSPORT_TARGET_HOST = "target_host";

/**
* Transport Service send request local host.
*/
public static final String TRANSPORT_HOST = "host";

/**
* Action Name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;

import java.util.Arrays;
Expand Down Expand Up @@ -127,4 +128,24 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio
return attributes;
}

/**
* Creates {@link SpanCreationContext} from Inbound Handler.
* @param action action.
* @param tcpChannel tcp channel.
* @return context
*/
public static SpanCreationContext from(String action, TcpChannel tcpChannel) {
return new SpanCreationContext(createSpanName(action, tcpChannel), buildSpanAttributes(action, tcpChannel));
}

private static String createSpanName(String action, TcpChannel tcpChannel) {
return action + SEPARATOR + tcpChannel.getLocalAddress().getHostString();
}

private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) {
Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, action);
attributes.addAttribute(AttributeNames.TRANSPORT_HOST, tcpChannel.getLocalAddress().getHostString());
return attributes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.channels;

import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.transport.TcpChannel;

import java.net.InetSocketAddress;

/**
* Tracer wrapped {@link TcpChannel}
*/
public class TraceableTcpChannel implements TcpChannel {

private final TcpChannel delegate;
private final Span span;
private final Tracer tracer;

private final static ActionListener<Void> DUMMY_ACTION_LISTENER = ActionListener.wrap(() -> {});

/**
* Constructor.
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
public TraceableTcpChannel(TcpChannel delegate, Span span, Tracer tracer) {
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
}

/**
* Factory method.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
* @return tcp channel
*/
public static TcpChannel create(TcpChannel delegate, final Span span, final Tracer tracer) {
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
TraceableTcpChannel traceableTcpChannel = new TraceableTcpChannel(delegate, span, tracer);
traceableTcpChannel.addCloseListener(TraceableActionListener.create(DUMMY_ACTION_LISTENER, span, tracer));
return traceableTcpChannel;
} else {
return delegate;
}
}

@Override
public void close() {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.close();
} finally {
span.endSpan();
}
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
delegate.addCloseListener(listener);
}

@Override
public boolean isOpen() {
return delegate.isOpen();
}

@Override
public boolean isServerChannel() {
return delegate.isServerChannel();
}

@Override
public String getProfile() {
return delegate.getProfile();
}

@Override
public InetSocketAddress getLocalAddress() {
return delegate.getLocalAddress();
}

@Override
public InetSocketAddress getRemoteAddress() {
return delegate.getRemoteAddress();
}

@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendMessage(reference, listener);
} finally {
span.endSpan();
}
}

@Override
public void addConnectListener(ActionListener<Void> listener) {
delegate.addConnectListener(listener);
}

@Override
public ChannelStats getChannelStats() {
return delegate.getChannelStats();
}
}

0 comments on commit 777421c

Please sign in to comment.