Skip to content

Commit

Permalink
Attempt to fix flaky Zipkin test
Browse files Browse the repository at this point in the history
Update `ZipkinHttpClientSenderTests` to use one mock server per test
rather than a shared static. For some reason the mock server sometimes
seems to get itself in a deadlock state which causes the client to
fail with a `HttpTimeoutException`.
  • Loading branch information
philwebb committed Mar 18, 2024
1 parent 91d150c commit b4208ed
Showing 1 changed file with 23 additions and 55 deletions.
Expand Up @@ -28,10 +28,7 @@

import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.QueueDispatcher;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import zipkin2.reporter.BytesMessageSender;
Expand All @@ -52,32 +49,23 @@
*/
class ZipkinHttpClientSenderTests extends ZipkinHttpSenderTests {

private static ClearableDispatcher dispatcher;
private MockWebServer mockBackEnd;

private static MockWebServer mockBackEnd;

static String zipkinUrl;

@BeforeAll
static void beforeAll() throws IOException {
dispatcher = new ClearableDispatcher();
mockBackEnd = new MockWebServer();
mockBackEnd.setDispatcher(dispatcher);
mockBackEnd.start();
zipkinUrl = mockBackEnd.url("/api/v2/spans").toString();
}

@AfterAll
static void afterAll() throws IOException {
mockBackEnd.shutdown();
}
private String zipkinUrl;

@Override
@BeforeEach
void beforeEach() throws Exception {
this.mockBackEnd = new MockWebServer();
this.mockBackEnd.start();
this.zipkinUrl = this.mockBackEnd.url("/api/v2/spans").toString();
super.beforeEach();
clearResponses();
clearRequests();
}

@Override
void afterEach() throws IOException {
super.afterEach();
this.mockBackEnd.shutdown();
}

@Override
Expand All @@ -92,12 +80,12 @@ ZipkinHttpClientSender createSender(Encoding encoding, Duration timeout) {
ZipkinHttpClientSender createSender(HttpEndpointSupplier.Factory endpointSupplierFactory, Encoding encoding,
Duration timeout) {
HttpClient httpClient = HttpClient.newBuilder().connectTimeout(timeout).build();
return new ZipkinHttpClientSender(encoding, endpointSupplierFactory, zipkinUrl, httpClient, timeout);
return new ZipkinHttpClientSender(encoding, endpointSupplierFactory, this.zipkinUrl, httpClient, timeout);
}

@Test
void sendShouldSendSpansToZipkin() throws IOException, InterruptedException {
mockBackEnd.enqueue(new MockResponse());
this.mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
this.sender.send(encodedSpans);
requestAssertions((request) -> {
Expand All @@ -109,7 +97,7 @@ void sendShouldSendSpansToZipkin() throws IOException, InterruptedException {

@Test
void sendShouldSendSpansToZipkinInProto3() throws IOException, InterruptedException {
mockBackEnd.enqueue(new MockResponse());
this.mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
try (BytesMessageSender sender = createSender(Encoding.PROTO3, Duration.ofSeconds(10))) {
sender.send(encodedSpans);
Expand All @@ -127,22 +115,22 @@ void sendShouldSendSpansToZipkinInProto3() throws IOException, InterruptedExcept
*/
@Test
void sendUsesDynamicEndpoint() throws Exception {
mockBackEnd.enqueue(new MockResponse());
mockBackEnd.enqueue(new MockResponse());
try (TestHttpEndpointSupplier httpEndpointSupplier = new TestHttpEndpointSupplier(zipkinUrl)) {
this.mockBackEnd.enqueue(new MockResponse());
this.mockBackEnd.enqueue(new MockResponse());
try (TestHttpEndpointSupplier httpEndpointSupplier = new TestHttpEndpointSupplier(this.zipkinUrl)) {
try (BytesMessageSender sender = createSender((endpoint) -> httpEndpointSupplier, Encoding.JSON,
Duration.ofSeconds(10))) {
sender.send(Collections.emptyList());
sender.send(Collections.emptyList());
}
assertThat(mockBackEnd.takeRequest().getPath()).endsWith("/1");
assertThat(mockBackEnd.takeRequest().getPath()).endsWith("/2");
assertThat(this.mockBackEnd.takeRequest().getPath()).endsWith("/1");
assertThat(this.mockBackEnd.takeRequest().getPath()).endsWith("/2");
}
}

@Test
void sendShouldHandleHttpFailures() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
this.mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
assertThatException().isThrownBy(() -> this.sender.send(Collections.emptyList()))
.withMessageContaining("Expected HTTP status 2xx, got 500");
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
Expand All @@ -154,7 +142,7 @@ void sendShouldCompressData() throws IOException, InterruptedException {
// This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder()
.decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA");
mockBackEnd.enqueue(new MockResponse());
this.mockBackEnd.enqueue(new MockResponse());
this.sender.send(List.of(toByteArray(uncompressed)));
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
Expand All @@ -168,35 +156,15 @@ void sendShouldCompressData() throws IOException, InterruptedException {
void shouldTimeout() throws IOException {
try (BytesMessageSender sender = createSender(Encoding.JSON, Duration.ofMillis(1))) {
MockResponse response = new MockResponse().setResponseCode(200).setHeadersDelay(100, TimeUnit.MILLISECONDS);
mockBackEnd.enqueue(response);
this.mockBackEnd.enqueue(response);
assertThatIOException().isThrownBy(() -> sender.send(Collections.emptyList()))
.withMessageContaining("timed out");
}
}

private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException {
RecordedRequest request = mockBackEnd.takeRequest();
RecordedRequest request = this.mockBackEnd.takeRequest();
assertThat(request).satisfies(assertions);
}

private static void clearRequests() throws InterruptedException {
RecordedRequest request;
do {
request = mockBackEnd.takeRequest(0, TimeUnit.SECONDS);
}
while (request != null);
}

private static void clearResponses() {
dispatcher.clear();
}

private static final class ClearableDispatcher extends QueueDispatcher {

void clear() {
getResponseQueue().clear();
}

}

}

0 comments on commit b4208ed

Please sign in to comment.