Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new OTEL Metrics source that creates events #4183

Merged
merged 4 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,54 @@
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.Metric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImplBase {
private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsGrpcService.class);

public static final String REQUESTS_RECEIVED = "requestsReceived";
public static final String SUCCESS_REQUESTS = "successRequests";
public static final String RECORDS_CREATED = "recordsCreated";
public static final String RECORDS_DROPPED = "recordsDropped";
public static final String PAYLOAD_SIZE = "payloadSize";
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

private final int bufferWriteTimeoutInMillis;
private final Buffer<Record<ExportMetricsServiceRequest>> buffer;
private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder;
private final Buffer<Record<? extends Metric>> buffer;

private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
private final Counter recordsCreatedCounter;
private final Counter recordsDroppedCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;


public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis,
Buffer<Record<ExportMetricsServiceRequest>> buffer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be sure to verify that the original source that uses ExportMetricsServiceRequest is not affected by this change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original source is not there any more. We are replacing original source with this new behavior.

final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder,
Buffer<Record<? extends Metric>> buffer,
final PluginMetrics pluginMetrics) {
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.buffer = buffer;

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
recordsCreatedCounter = pluginMetrics.counter(RECORDS_CREATED);
recordsDroppedCounter = pluginMetrics.counter(RECORDS_DROPPED);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}

public void rawExport(final ExportMetricsServiceRequest request) {
try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
}
this.oTelProtoDecoder = oTelProtoDecoder;
}

@Override
Expand All @@ -81,7 +87,13 @@ private void processRequest(final ExportMetricsServiceRequest request, final Str
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
Collection<Record<? extends Metric>> metrics;

AtomicInteger droppedCounter = new AtomicInteger(0);
metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false);
recordsDroppedCounter.increment(droppedCounter.get());
recordsCreatedCounter.increment(metrics.size());
buffer.writeAll(metrics, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
if (ServiceRequestContext.current().isTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.Metric;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
Expand All @@ -51,7 +53,7 @@
import java.util.function.Function;

@DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class)
public class OTelMetricsSource implements Source<Record<ExportMetricsServiceRequest>> {
public class OTelMetricsSource implements Source<Record<? extends Metric>> {
private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class);
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
Expand Down Expand Up @@ -91,15 +93,15 @@ public ByteDecoder getDecoder() {
}

@Override
public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {
public void start(Buffer<Record<? extends Metric>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}

if (server == null) {

final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService(
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8),
new OTelProtoCodec.OTelProtoDecoder(),
buffer,
pluginMetrics
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -29,17 +30,20 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.hamcrest.Matchers.hasEntry;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -48,15 +52,20 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.Gauge;

@ExtendWith(MockitoExtension.class)
public class OTelMetricsGrpcServiceTest {
private static NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4);
private static Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build();
private static final ExportMetricsServiceRequest METRICS_REQUEST = ExportMetricsServiceRequest.newBuilder()
.addResourceMetrics(ResourceMetrics.newBuilder()
.addInstrumentationLibraryMetrics(InstrumentationLibraryMetrics.newBuilder()
.addMetrics(Metric.newBuilder().build())
.addMetrics(Metric.newBuilder().setGauge(gauge).setUnit("seconds").setName("name").build())
.build())).build();

private static Map<String, Object> expectedMetric = Map.of("unit", (Object)"seconds", "name", (Object)"name", "kind", (Object)"GAUGE");
private static PluginSetting pluginSetting;
private final int bufferWriteTimeoutInMillis = 100000;

Expand All @@ -65,6 +74,10 @@ public class OTelMetricsGrpcServiceTest {
@Mock
private Counter successRequestsCounter;
@Mock
private Counter droppedCounter;
@Mock
private Counter createdCounter;
@Mock
private DistributionSummary payloadSize;
@Mock
private Timer requestProcessDuration;
Expand All @@ -76,7 +89,7 @@ public class OTelMetricsGrpcServiceTest {
private ServiceRequestContext serviceRequestContext;

@Captor
private ArgumentCaptor<Record> recordCaptor;
private ArgumentCaptor<Collection<Record>> recordCaptor;

@Captor
ArgumentCaptor<byte[]> bytesCaptor;
Expand All @@ -92,6 +105,8 @@ public void setup() {

when(mockPluginMetrics.counter(OTelMetricsGrpcService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter);
when(mockPluginMetrics.counter(OTelMetricsGrpcService.SUCCESS_REQUESTS)).thenReturn(successRequestsCounter);
when(mockPluginMetrics.counter(OTelMetricsGrpcService.RECORDS_CREATED)).thenReturn(createdCounter);
when(mockPluginMetrics.counter(OTelMetricsGrpcService.RECORDS_DROPPED)).thenReturn(droppedCounter);
when(mockPluginMetrics.summary(OTelMetricsGrpcService.PAYLOAD_SIZE)).thenReturn(payloadSize);
when(mockPluginMetrics.timer(OTelMetricsGrpcService.REQUEST_PROCESS_DURATION)).thenReturn(requestProcessDuration);
doAnswer(invocation -> {
Expand All @@ -101,7 +116,7 @@ public void setup() {

when(serviceRequestContext.isTimedOut()).thenReturn(false);

sut = new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, buffer, mockPluginMetrics);
sut = new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, new OTelProtoCodec.OTelProtoDecoder(), buffer, mockPluginMetrics);
}

@Test
Expand All @@ -111,7 +126,7 @@ public void export_Success_responseObserverOnCompleted() throws Exception {
sut.export(METRICS_REQUEST, responseObserver);
}

verify(buffer, times(1)).write(recordCaptor.capture(), anyInt());
verify(buffer, times(1)).writeAll(recordCaptor.capture(), anyInt());
verify(responseObserver, times(1)).onNext(ExportMetricsServiceResponse.newBuilder().build());
verify(responseObserver, times(1)).onCompleted();
verify(requestsReceivedCounter, times(1)).increment();
Expand All @@ -122,8 +137,11 @@ public void export_Success_responseObserverOnCompleted() throws Exception {
assertThat(payloadLengthCaptor.getValue().intValue(), equalTo(METRICS_REQUEST.getSerializedSize()));
verify(requestProcessDuration, times(1)).record(ArgumentMatchers.<Runnable>any());

Record capturedRecord = recordCaptor.getValue();
assertEquals(METRICS_REQUEST, capturedRecord.getData());
Collection<Record> capturedRecords = recordCaptor.getValue();
Record capturedRecord = (Record)(capturedRecords.toArray()[0]);
Map<String, Object> map = ((Event)capturedRecord.getData()).toMap();

expectedMetric.forEach((k, v) -> assertThat(map, hasEntry((String)k, (Object)v)));
}

@Test
Expand Down Expand Up @@ -151,14 +169,14 @@ public void export_Success_with_ByteBuffer_responseObserverOnCompleted() throws

@Test
public void export_BufferTimeout_responseObserverOnError() throws Exception {
doThrow(new TimeoutException()).when(buffer).write(any(Record.class), anyInt());
doThrow(new TimeoutException()).when(buffer).writeAll(any(Collection.class), anyInt());

try (MockedStatic<ServiceRequestContext> mockedStatic = mockStatic(ServiceRequestContext.class)) {
mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext);
assertThrows(BufferWriteException.class, () -> sut.export(METRICS_REQUEST, responseObserver));
}

verify(buffer, times(1)).write(any(Record.class), anyInt());
verify(buffer, times(1)).writeAll(any(Collection.class), anyInt());
verifyNoInteractions(responseObserver);
verify(requestsReceivedCounter, times(1)).increment();
verifyNoInteractions(successRequestsCounter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;

import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
Expand Down Expand Up @@ -62,6 +67,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.Metric;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
Expand All @@ -79,6 +85,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -163,7 +170,7 @@ class OTelMetricsSourceTest {
private OTelMetricsSourceConfig oTelMetricsSourceConfig;

@Mock
private BlockingBuffer<Record<ExportMetricsServiceRequest>> buffer;
private BlockingBuffer<Record<? extends Metric>> buffer;

@Mock
private HttpBasicAuthenticationConfig httpBasicAuthenticationConfig;
Expand Down Expand Up @@ -901,12 +908,12 @@ void gRPC_request_writes_to_buffer_with_successful_response() throws Exception {
final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest());
assertThat(exportResponse, notNullValue());

final ArgumentCaptor<Record<ExportMetricsServiceRequest>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class);
verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt());
final ArgumentCaptor<Collection<Record<? extends Metric>>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class);
verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt());

final Record<ExportMetricsServiceRequest> actualBufferWrites = bufferWriteArgumentCaptor.getValue();
final Collection<Record<? extends Metric>> actualBufferWrites = bufferWriteArgumentCaptor.getValue();
assertThat(actualBufferWrites, notNullValue());
assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1));
assertThat(actualBufferWrites.size(), equalTo(1));
}

@Test
Expand Down Expand Up @@ -935,12 +942,12 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E
final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest());
assertThat(exportResponse, notNullValue());

final ArgumentCaptor<Record<ExportMetricsServiceRequest>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class);
verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt());
final ArgumentCaptor<Collection<Record<? extends Metric>>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class);
verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt());

final Record<ExportMetricsServiceRequest> actualBufferWrites = bufferWriteArgumentCaptor.getValue();
final Collection<Record<? extends Metric>> actualBufferWrites = bufferWriteArgumentCaptor.getValue();
assertThat(actualBufferWrites, notNullValue());
assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1));
assertThat(actualBufferWrites.size(), equalTo(1));
}

@Test
Expand Down Expand Up @@ -971,7 +978,7 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer(

doThrow(bufferExceptionClass)
.when(buffer)
.write(any(Record.class), anyInt());
.writeAll(any(Collection.class), anyInt());
final ExportMetricsServiceRequest exportMetricsRequest = createExportMetricsRequest();
final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(exportMetricsRequest));

Expand Down Expand Up @@ -1014,9 +1021,26 @@ private ExportMetricsServiceRequest createExportMetricsRequest() {
.setKey("service.name")
.setValue(AnyValue.newBuilder().setStringValue("service").build())
).build();
NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4);
Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build();

io.opentelemetry.proto.metrics.v1.Metric.Builder metric = io.opentelemetry.proto.metrics.v1.Metric.newBuilder()
.setGauge(gauge)
.setUnit("seconds")
.setName("name")
.setDescription("description");
InstrumentationLibraryMetrics isntLib = InstrumentationLibraryMetrics.newBuilder()
.addMetrics(metric)
.setInstrumentationLibrary(InstrumentationLibrary.newBuilder()
.setName("ilname")
.setVersion("ilversion")
.build())
.build();


final ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.setResource(resource)
.addInstrumentationLibraryMetrics(isntLib)
.build();

return ExportMetricsServiceRequest.newBuilder()
Expand Down
Loading