Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate serialization IOException instead of rethrowing as runtime #6082

Merged
merged 7 commits into from
Jan 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -37,7 +35,6 @@ public final class HttpExporter<T extends Marshaler> {
private final String type;
private final HttpSender httpSender;
private final ExporterMetrics exporterMetrics;
private final boolean exportAsJson;

public HttpExporter(
String exporterName,
Expand All @@ -51,7 +48,6 @@ public HttpExporter(
exportAsJson
? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier)
: ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier);
this.exportAsJson = exportAsJson;
}

public CompletableResultCode export(T exportRequest, int numItems) {
Expand All @@ -63,21 +59,8 @@ public CompletableResultCode export(T exportRequest, int numItems) {

CompletableResultCode result = new CompletableResultCode();

Consumer<OutputStream> marshaler =
os -> {
try {
if (exportAsJson) {
exportRequest.writeJsonTo(os);
} else {
exportRequest.writeBinaryTo(os);
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
};

httpSender.send(
marshaler,
exportRequest,
exportRequest.getBinarySerializedSize(),
httpResponse -> {
int statusCode = httpResponse.statusCode();
Expand All @@ -90,11 +73,11 @@ public CompletableResultCode export(T exportRequest, int numItems) {

exporterMetrics.addFailed(numItems);

byte[] body;
byte[] body = null;
try {
body = httpResponse.responseBody();
} catch (IOException ex) {
throw new IllegalStateException(ex);
logger.log(Level.FINE, "Unable to obtain response body", ex);
}

String status = extractErrorStatus(httpResponse.statusMessage(), body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public HttpExporter<T> build() {
httpSenderProvider.createSender(
endpoint,
compressionEnabled,
exportAsJson,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.opentelemetry.exporter.internal.http;

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Consumer;

/**
Expand All @@ -33,7 +33,7 @@ public interface HttpSender {
* @param onError the callback to invoke when the HTTP request could not be executed
*/
void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface HttpSenderProvider {
HttpSender createSender(
String endpoint,
boolean compressionEnabled,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand Down
3 changes: 3 additions & 0 deletions exporters/sender/jdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal")
dependencies {
implementation(project(":exporters:common"))
implementation(project(":sdk:common"))

compileOnly("com.fasterxml.jackson.core:jackson-core")
testImplementation("com.linecorp.armeria:armeria-junit5")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package io.opentelemetry.exporter.sender.jdk.internal;

import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -53,6 +55,7 @@ public final class JdkHttpSender implements HttpSender {
private final HttpClient client;
private final URI uri;
private final boolean compressionEnabled;
private final boolean exportAsJson;
private final String contentType;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
Expand All @@ -63,6 +66,7 @@ public final class JdkHttpSender implements HttpSender {
HttpClient client,
String endpoint,
boolean compressionEnabled,
boolean exportAsJson,
String contentType,
long timeoutNanos,
Supplier<Map<String, List<String>>> headerSupplier,
Expand All @@ -74,6 +78,7 @@ public final class JdkHttpSender implements HttpSender {
throw new IllegalArgumentException(e);
}
this.compressionEnabled = compressionEnabled;
this.exportAsJson = exportAsJson;
this.contentType = contentType;
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
Expand All @@ -83,6 +88,7 @@ public final class JdkHttpSender implements HttpSender {
JdkHttpSender(
String endpoint,
boolean compressionEnabled,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeoutNanos,
Expand All @@ -93,6 +99,7 @@ public final class JdkHttpSender implements HttpSender {
configureClient(sslContext, connectTimeoutNanos),
endpoint,
compressionEnabled,
exportAsJson,
contentType,
timeoutNanos,
headerSupplier,
Expand All @@ -111,7 +118,7 @@ private static HttpClient configureClient(

@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
Expand All @@ -121,7 +128,7 @@ public void send(
try {
return sendInternal(marshaler);
} catch (IOException e) {
throw new IllegalStateException(e);
throw new UncheckedIOException(e);
}
},
executorService)
Expand All @@ -136,7 +143,7 @@ public void send(
}

// Visible for testing
HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOException {
HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos));
Expand All @@ -151,12 +158,10 @@ HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOExc
if (compressionEnabled) {
requestBuilder.header("Content-Encoding", "gzip");
try (GZIPOutputStream gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
write(marshaler, gzos);
}
} else {
marshaler.accept(os);
write(marshaler, os);
}

ByteBufferPool byteBufferPool = threadLocalByteBufPool.get();
Expand Down Expand Up @@ -211,6 +216,14 @@ HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOExc
throw exception;
}

private void write(Marshaler marshaler, OutputStream os) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(os);
} else {
marshaler.writeBinaryTo(os);
}
}

private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider {
public HttpSender createSender(
String endpoint,
boolean compressionEnabled,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand All @@ -39,6 +40,7 @@ public HttpSender createSender(
return new JdkHttpSender(
endpoint,
compressionEnabled,
exportAsJson,
contentType,
timeoutNanos,
connectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.http.HttpClient;
Expand All @@ -22,10 +30,12 @@
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
Expand All @@ -35,6 +45,8 @@
@MockitoSettings(strictness = Strictness.LENIENT)
class JdkHttpSenderTest {

@RegisterExtension static final MockWebServerExtension server = new MockWebServerExtension();

private final HttpClient realHttpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofMillis(10)).build();
@Mock private HttpClient mockHttpClient;
Expand All @@ -54,6 +66,7 @@ void setup() throws IOException, InterruptedException {
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
false,
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
Expand All @@ -63,9 +76,23 @@ void setup() throws IOException, InterruptedException {
.build());
}

@Test
void exportAsJson() {
HttpExporter<Marshaler> exporter =
new HttpExporterBuilder<>("jdk", "test", server.httpUri().toASCIIString())
.exportAsJson()
.build();

server.enqueue(HttpResponse.of(HttpStatus.OK));

CompletableResultCode result = exporter.export(new NoOpMarshaler(), 0);
result.join(1, TimeUnit.MINUTES);
Assertions.assertThat(result.isSuccess()).isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we use awaitility for this, rather than result.join?

Something like await().untilAsserted(() -> Assertions.assertThat(result.isSuccess()).isTrue());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in cases like this where the future is expected to resolve quickly and deterministically, its ok to use join. For example, we use it all over the place in AbstractHttpTelemetryExporterTest.

With that said, I'm actually not sure what this new test has to do with this change so I've gone ahead and removed it.

}

@Test
void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException {
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(HttpConnectTimeoutException.class);

verify(mockHttpClient, times(2)).send(any(), any());
Expand All @@ -75,7 +102,7 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("error!");

Expand All @@ -86,7 +113,7 @@ void sendInternal_RetryableIoException() throws IOException, InterruptedExceptio
void sendInternal_NonRetryableException() throws IOException, InterruptedException {
doThrow(new SSLException("unknown error")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("unknown error");

Expand All @@ -99,6 +126,7 @@ void connectTimeout() {
new JdkHttpSender(
"http://localhost",
true,
false,
"text/plain",
1,
TimeUnit.SECONDS.toNanos(10),
Expand All @@ -112,4 +140,15 @@ void connectTimeout() {
httpClient ->
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
}

private static class NoOpMarshaler extends Marshaler {

@Override
public int getBinarySerializedSize() {
return 0;
}

@Override
protected void writeTo(Serializer output) throws IOException {}
}
}
1 change: 1 addition & 0 deletions exporters/sender/okhttp/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation("com.squareup.okhttp3:okhttp")

compileOnly("io.grpc:grpc-stub")
compileOnly("com.fasterxml.jackson.core:jackson-core")

testImplementation("com.linecorp.armeria:armeria-junit5")
}
Loading