Skip to content

Commit

Permalink
fix fabric8io#3957: Lister onOpen should be called before marking t…
Browse files Browse the repository at this point in the history
…he connection as open

(cherry picked from commit eaf7a15)
Signed-off-by: Marc Nuri <marc@marcnuri.com>
  • Loading branch information
AdrianFarmadin authored and manusa committed Apr 5, 2022
1 parent 40fa2bf commit 36dabfa
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,58 @@
import java.util.concurrent.CompletableFuture;

public interface WebSocket {

public interface Listener {

default void onOpen(WebSocket webSocket) { }

default void onMessage(WebSocket webSocket, String text) {}

default void onMessage(WebSocket webSocket, ByteBuffer bytes) {}

default void onClose(WebSocket webSocket, int code, String reason) {}

default void onError(WebSocket webSocket, Throwable error) {}


default void onOpen(WebSocket webSocket) {
}

default void onMessage(WebSocket webSocket, String text) {
}

default void onMessage(WebSocket webSocket, ByteBuffer bytes) {
}

default void onClose(WebSocket webSocket, int code, String reason) {
}

default void onError(WebSocket webSocket, Throwable error) {
}

}

public interface Builder extends BasicBuilder {


/**
* Builds a new WebSocket connection and waits asynchronously until the connection is opened.
* The listener onOpen callback is called before the returned future is completed.
*
* @param listener
* @return CompletableFuture which is completed after connection is opened
*/
CompletableFuture<WebSocket> buildAsync(Listener listener);

@Override
Builder header(String name, String value);

@Override
Builder setHeader(String k, String v);

@Override
Builder uri(URI uri);

}

/**
* Send some data
*
* @return true if the message was successfully enqueued.
*/
boolean send(ByteBuffer buffer);

/**
* Send a close message
*
* @return true if the message was successfully enqueued.
*/
boolean sendClose(int code, String reason);
Expand All @@ -68,5 +82,5 @@ public interface Builder extends BasicBuilder {
* doesn't include framing overhead.
*/
long queueSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import java.util.concurrent.CompletableFuture;

class OkHttpWebSocketImpl implements WebSocket {

static class BuilderImpl implements WebSocket.Builder {

private Request.Builder builder = new Request.Builder();
private OkHttpClient httpClient;

public BuilderImpl(OkHttpClient httpClient) {
this.httpClient = httpClient;
}

@Override
public Builder uri(URI uri) {
builder.url(HttpUrl.get(uri));
Expand All @@ -53,8 +53,8 @@ public CompletableFuture<WebSocket> buildAsync(Listener listener) {
Request request = builder.build();
CompletableFuture<WebSocket> future = new CompletableFuture<>();
httpClient.newWebSocket(request, new WebSocketListener() {
private volatile boolean opened;
private volatile boolean opened;

@Override
public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {
if (response != null) {
Expand All @@ -63,44 +63,45 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
if (!opened) {
if (response != null) {
try {
future.completeExceptionally(new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t));
future.completeExceptionally(
new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t));
} catch (IOException e) {
// can't happen
}
}
} else {
future.completeExceptionally(t);
}
} else {
listener.onError(new OkHttpWebSocketImpl(webSocket), t);
}
}

@Override
public void onOpen(okhttp3.WebSocket webSocket, Response response) {
opened = true;
if (response != null) {
response.close();
}
OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket);
future.complete(value);
listener.onOpen(value);
future.complete(value);
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) {
listener.onMessage(new OkHttpWebSocketImpl(webSocket), bytes.asByteBuffer());
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, String text) {
listener.onMessage(new OkHttpWebSocketImpl(webSocket), text);
}

@Override
public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) {
listener.onClose(new OkHttpWebSocketImpl(webSocket), code, reason);
}

});
return future;
}
Expand All @@ -110,17 +111,17 @@ public WebSocket.Builder header(String name, String value) {
builder = builder.addHeader(name, value);
return this;
}

@Override
public WebSocket.Builder setHeader(String k, String v) {
builder = builder.header(k, v);
return this;
}

}

private okhttp3.WebSocket webSocket;

public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket) {
this.webSocket = webSocket;
}
Expand All @@ -134,10 +135,10 @@ public boolean send(ByteBuffer buffer) {
public boolean sendClose(int code, String reason) {
return webSocket.close(code, reason);
}

@Override
public long queueSize() {
return webSocket.queueSize();
}
}

}

0 comments on commit 36dabfa

Please sign in to comment.