Skip to content

Commit

Permalink
fix fabric8io#4201: moving consumeLines out of the clients
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Nov 29, 2022
1 parent 009974f commit 6f31d2a
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,6 @@ public DerivedClientBuilder newBuilder() {
return this.builder.copy(this);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -76,29 +75,6 @@ public DerivedClientBuilder newBuilder() {
return builder.copy(this);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, AsyncBody.Consumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener(request) {

final StringBuilder builder = new StringBuilder();

@Override
protected void onContent(ByteBuffer content) throws Exception {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
consumer.consume(builder.toString(), this);
builder.setLength(0);
} else {
builder.append(c);
}
}
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest originalRequest, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,6 @@ public DerivedClientBuilder newBuilder() {
return new OkHttpClientBuilderImpl(httpClient.newBuilder(), this.factory, this.config);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest request, AsyncBody.Consumer<String> consumer) {
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<String>(consumer, s) {
@Override
protected String process(BufferedSource source) throws IOException {
// this should probably be strict instead
// when non-strict if no newline is present, this will create a truncated string from
// what is available. However as strict it will be blocking.
return source.readUtf8Line();
}
};
return sendAsync(request, handler);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest request, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -184,41 +182,6 @@ void testAsyncBody() throws Exception {
assertEquals(byteCount, consumed.get(10, TimeUnit.SECONDS));
}

@Test
void testConsumeLines() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always();

ArrayList<String> strings = new ArrayList<>();
CompletableFuture<Void> consumed = new CompletableFuture<>();

CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeLines(
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
.build(),
(value, asyncBody) -> {
strings.add(value);
asyncBody.consume();
});

responseFuture.whenComplete((r, t) -> {
if (t != null) {
consumed.completeExceptionally(t);
}
if (r != null) {
r.body().consume();
r.body().done().whenComplete((v, ex) -> {
if (ex != null) {
consumed.completeExceptionally(ex);
} else {
consumed.complete(null);
}
});
}
});

consumed.get(10, TimeUnit.SECONDS);
assertEquals(Arrays.asList("hello", "world", "lines"), strings);
}

@DisplayName("Supported response body types")
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,6 @@ default <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Cl
return HttpResponse.SupportedResponses.from(type).sendAsync(request, this);
}

/**
* Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to
* break up the lines.
*
* @param request the HttpRequest to send
* @param consumer the response body consumer
* @return the future which will be ready after the headers have been read
*/
CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer);

/**
* Send a request and consume the bytes of the resulting response body
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

public abstract class AbstractAsyncBodyTest {

Expand All @@ -50,72 +45,6 @@ static void afterAll() {

protected abstract HttpClient.Factory getHttpClientFactory();

@Test
@DisplayName("Lines are processed and consumed only after the consume() invocation")
public void consumeLinesProcessedAfterConsume() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/consume-lines")
.andReturn(200, "This is the response body\n")
.always();
final StringBuffer responseText = new StringBuffer();
final HttpResponse<AsyncBody> asyncBodyResponse = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(),
(value, asyncBody) -> {
responseText.append(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
assertThat(responseText).isEmpty();
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(responseText).contains("This is the response body");
}
}

@Test
@DisplayName("Lines are not processed when cancel() invocation")
public void consumeLinesNotProcessedIfCancelled() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/cancel")
.andReturn(200, "This would be the response body")
.always();
final StringBuffer responseText = new StringBuffer();
final HttpResponse<AsyncBody> asyncBodyResponse = client
.consumeLines(client.newHttpRequestBuilder()
.uri(server.url("/cancel")).build(), (value, asyncBody) -> {
responseText.append(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncBodyResponse.body().cancel();
asyncBodyResponse.body().consume();
final CompletableFuture<Void> doneFuture = asyncBodyResponse.body().done();
assertThrows(CancellationException.class, () -> doneFuture.get(10L, TimeUnit.SECONDS));
assertThat(responseText).isEmpty();
}
}

@Test
@DisplayName("Lines are processed completely")
public void consumeLinesProcessesAllLines() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/consume-lines")
.andReturn(200, "This is the response body\nWith\nMultiple\n lines\n")
.always();
final List<String> receivedLines = new ArrayList<>();
final HttpResponse<AsyncBody> asyncBodyResponse = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(),
(value, asyncBody) -> {
receivedLines.add(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(receivedLines).containsExactly("This is the response body", "With", "Multiple", " lines");
}
}

@Test
@DisplayName("Bytes are processed and consumed only after the consume() invocation")
public void consumeBytesProcessedAfterConsume() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,41 +115,6 @@ public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpRespons
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines")
public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception {
// Given
server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once();
final HttpClient.Builder builder = getHttpClientFactory().newBuilder()
.addOrReplaceInterceptor("test", new Interceptor() {
@Override
public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpResponse<?> response) {
builder.uri(URI.create(server.url("/intercepted-url")));
return CompletableFuture.completedFuture(true);
}
});
final CompletableFuture<String> result = new CompletableFuture<>();
// When
try (HttpClient client = builder.build()) {
final HttpResponse<AsyncBody> asyncR = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> {
result.complete(s);
ab.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncR.body().consume();
asyncR.body().done().whenComplete((v, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
result.complete(null);
}
});
// Then
assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works");
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes")
public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -66,8 +68,18 @@ public WatchHTTPManager(final HttpClient client,
protected synchronized void start(URL url, Map<String, String> headers) {
HttpRequest.Builder builder = client.newHttpRequestBuilder().url(url);
headers.forEach(builder::header);
call = client.consumeLines(builder.build(), (s, a) -> {
onMessage(s);
StringBuffer buffer = new StringBuffer();
call = client.consumeBytes(builder.build(), (b, a) -> {
for (ByteBuffer content : b) {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
onMessage(buffer.toString());
buffer.setLength(0);
} else {
buffer.append(c);
}
}
}
a.consume();
});
call.whenComplete((response, t) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void testReconnectOnException() throws MalformedURLException, InterruptedExcepti
BaseOperation baseOperation = Mockito.mock(BaseOperation.class);
Mockito.when(baseOperation.getNamespacedUrl()).thenReturn(new URL("http://localhost"));
CompletableFuture<HttpResponse<AsyncBody>> future = new CompletableFuture<>();
Mockito.when(client.consumeLines(Mockito.any(), Mockito.any())).thenReturn(future);
Mockito.when(client.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future);

CountDownLatch reconnect = new CountDownLatch(1);
WatchHTTPManager<HasMetadata, KubernetesResourceList<HasMetadata>> watch = new WatchHTTPManager(client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import io.fabric8.kubernetes.client.dsl.Watchable;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.Watcher.Action.BOOKMARK;
import static io.fabric8.kubernetes.client.Watcher.Action.DELETED;
Expand Down Expand Up @@ -315,11 +318,12 @@ public void onClose(WatcherException cause) {
}

private static WatchEvent outdatedEvent() {
return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()).withStatusObject(
new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE)
.withMessage(
"410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
.build())
return new WatchEventBuilder().withType(Watcher.Action.ERROR.name())
.withStatusObject(
new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE)
.withMessage(
"410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
.build())
.build();
}

Expand Down Expand Up @@ -425,4 +429,44 @@ public void onClose(WatcherException cause) {
// ensure that the exception does not inhibit further message processing
assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
void testHttpWatch() throws InterruptedException {
// Given

// trigger the usage of the http watch
server.expect()
.withPath("/api/v1/namespaces/test/pods?allowWatchBookmarks=true&watch=true")
.andReturn(200, null)
.once();

String dummyEvent = Serialization.asJson(new WatchEventBuilder().withType("MODIFIED")
.withObject(new PodBuilder().withNewMetadata().endMetadata().build())
.build()) + "\n";

// build a response that is large enough to span multiple messages
// there's potentially a corner case here with utf multi-byte that is unhandled
// if that happens we'll see an exception from the decode
server.expect()
.withPath("/api/v1/namespaces/test/pods?allowWatchBookmarks=true&watch=true")
.andReturn(200, Collections.nCopies(200, dummyEvent).stream().collect(Collectors.joining()))
.once();

CountDownLatch latch = new CountDownLatch(200);

client.pods().watch(new Watcher<Pod>() {

@Override
public void eventReceived(Action action, Pod resource) {
latch.countDown();
}

@Override
public void onClose(WatcherException cause) {
}
});

// ensure that the exception does not inhibit further message processing
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
}

0 comments on commit 6f31d2a

Please sign in to comment.