Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
Krishna Kondaka committed Feb 24, 2024
1 parent ca0cfe8 commit f00afb2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,37 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp

public static final String REQUESTS_RECEIVED = "requestsReceived";
public static final String SUCCESS_REQUESTS = "successRequests";
public static final String RECORDS_CREATED = "recordsCreated";
public static final String RECORDS_DROPPED = "recordsDropped";
public static final String PAYLOAD_SIZE = "payloadSize";
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

private final int bufferWriteTimeoutInMillis;
private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder;
private final Buffer<Record<? extends Metric>> buffer;

private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
private final Counter recordsCreatedCounter;
private final Counter recordsDroppedCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;


public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis,
final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder,
Buffer<Record<? extends Metric>> buffer,
final PluginMetrics pluginMetrics) {
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.buffer = buffer;

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
recordsCreatedCounter = pluginMetrics.counter(RECORDS_CREATED);
recordsDroppedCounter = pluginMetrics.counter(RECORDS_DROPPED);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
this.oTelProtoDecoder = oTelProtoDecoder;
}

@Override
Expand All @@ -79,10 +88,11 @@ private void processRequest(final ExportMetricsServiceRequest request, final Str
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
Collection<Record<? extends Metric>> metrics;
AtomicInteger droppedCounter = new AtomicInteger(0);

OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder();
AtomicInteger droppedCounter = new AtomicInteger(0);
metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false);
recordsDroppedCounter.increment(droppedCounter.get());
recordsCreatedCounter.increment(metrics.size());
buffer.writeAll(metrics, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
Expand Down Expand Up @@ -98,12 +99,9 @@ public void start(Buffer<Record<? extends Metric>> buffer) {
}

if (server == null) {

final int bufferWriteTimeoutInMillis =
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8);
final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService(
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8),

new OTelProtoCodec.OTelProtoDecoder(),
buffer,
pluginMetrics
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,6 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E
final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest());
assertThat(exportResponse, notNullValue());

//final ArgumentCaptor<Record<ExportMetricsServiceRequest>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class);
final ArgumentCaptor<Collection<Record<? extends Metric>>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class);
verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt());

Expand Down

0 comments on commit f00afb2

Please sign in to comment.