diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java index 2b13292c024..5b69b004fc6 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java @@ -21,44 +21,58 @@ import java.util.concurrent.CompletableFuture; public interface WebSocket { - + public interface Listener { - - default void onOpen(WebSocket webSocket) { } - - default void onMessage(WebSocket webSocket, String text) {} - - default void onMessage(WebSocket webSocket, ByteBuffer bytes) {} - - default void onClose(WebSocket webSocket, int code, String reason) {} - - default void onError(WebSocket webSocket, Throwable error) {} - + + default void onOpen(WebSocket webSocket) { + } + + default void onMessage(WebSocket webSocket, String text) { + } + + default void onMessage(WebSocket webSocket, ByteBuffer bytes) { + } + + default void onClose(WebSocket webSocket, int code, String reason) { + } + + default void onError(WebSocket webSocket, Throwable error) { + } + } - + public interface Builder extends BasicBuilder { - + + /** + * Builds a new WebSocket connection and waits asynchronously until the connection is opened. + * The listener onOpen callback is called before the returned future is completed. + * + * @param listener + * @return CompletableFuture which is completed after connection is opened + */ CompletableFuture buildAsync(Listener listener); @Override Builder header(String name, String value); - + @Override Builder setHeader(String k, String v); - + @Override Builder uri(URI uri); - + } /** * Send some data + * * @return true if the message was successfully enqueued. */ boolean send(ByteBuffer buffer); - + /** * Send a close message + * * @return true if the message was successfully enqueued. */ boolean sendClose(int code, String reason); @@ -68,5 +82,5 @@ public interface Builder extends BasicBuilder { * doesn't include framing overhead. */ long queueSize(); - + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java index 64bcb774a9c..20fa45122b2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java @@ -32,16 +32,16 @@ import java.util.concurrent.CompletableFuture; class OkHttpWebSocketImpl implements WebSocket { - + static class BuilderImpl implements WebSocket.Builder { - + private Request.Builder builder = new Request.Builder(); private OkHttpClient httpClient; public BuilderImpl(OkHttpClient httpClient) { this.httpClient = httpClient; } - + @Override public Builder uri(URI uri) { builder.url(HttpUrl.get(uri)); @@ -53,8 +53,8 @@ public CompletableFuture buildAsync(Listener listener) { Request request = builder.build(); CompletableFuture future = new CompletableFuture<>(); httpClient.newWebSocket(request, new WebSocketListener() { - private volatile boolean opened; - + private volatile boolean opened; + @Override public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) { if (response != null) { @@ -63,10 +63,11 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons if (!opened) { if (response != null) { try { - future.completeExceptionally(new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t)); + future.completeExceptionally( + new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t)); } catch (IOException e) { // can't happen - } + } } else { future.completeExceptionally(t); } @@ -74,7 +75,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons listener.onError(new OkHttpWebSocketImpl(webSocket), t); } } - + @Override public void onOpen(okhttp3.WebSocket webSocket, Response response) { opened = true; @@ -82,25 +83,25 @@ public void onOpen(okhttp3.WebSocket webSocket, Response response) { response.close(); } OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket); - future.complete(value); listener.onOpen(value); + future.complete(value); } - + @Override public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) { listener.onMessage(new OkHttpWebSocketImpl(webSocket), bytes.asByteBuffer()); } - + @Override public void onMessage(okhttp3.WebSocket webSocket, String text) { listener.onMessage(new OkHttpWebSocketImpl(webSocket), text); } - + @Override public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) { listener.onClose(new OkHttpWebSocketImpl(webSocket), code, reason); } - + }); return future; } @@ -110,17 +111,17 @@ public WebSocket.Builder header(String name, String value) { builder = builder.addHeader(name, value); return this; } - + @Override public WebSocket.Builder setHeader(String k, String v) { builder = builder.header(k, v); return this; } - + } - + private okhttp3.WebSocket webSocket; - + public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket) { this.webSocket = webSocket; } @@ -134,10 +135,10 @@ public boolean send(ByteBuffer buffer) { public boolean sendClose(int code, String reason) { return webSocket.close(code, reason); } - + @Override public long queueSize() { return webSocket.queueSize(); } - -} \ No newline at end of file + +}