-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Work in progress, web socket support.
- Loading branch information
1 parent
cbac266
commit 2376176
Showing
6 changed files
with
322 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
.gitignore | ||
git_push.sh | ||
# Remove this once swagger-codegen 2.2.3 is released and we update. | ||
# We want https://github.com/swagger-api/swagger-codegen/pull/5629 | ||
# in the release. | ||
# Verify the following PRs are in the release: | ||
# * https://github.com/swagger-api/swagger-codegen/pull/5629 | ||
# * https://github.com/swagger-api/swagger-codegen/pull/5648 | ||
src/main/java/io/kubernetes/client/ApiClient.java | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
188 changes: 188 additions & 0 deletions
188
util/src/main/java/io/kubernetes/client/util/WebSocketCall.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
/* | ||
* Copyright (C) 2014 Square, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
// This has been cloned from the okhttp-ws package so that I could remove | ||
// the requirement that websockets use the "GET" method | ||
package io.kubernetes.client.util; | ||
|
||
import com.squareup.okhttp.Call; | ||
import com.squareup.okhttp.Callback; | ||
import com.squareup.okhttp.OkHttpClient; | ||
import com.squareup.okhttp.Request; | ||
import com.squareup.okhttp.Response; | ||
import com.squareup.okhttp.internal.Internal; | ||
import com.squareup.okhttp.internal.Util; | ||
import com.squareup.okhttp.internal.http.StreamAllocation; | ||
import com.squareup.okhttp.internal.ws.RealWebSocket; | ||
import com.squareup.okhttp.internal.ws.WebSocketProtocol; | ||
import com.squareup.okhttp.ws.WebSocketListener; | ||
import java.io.IOException; | ||
import java.net.ProtocolException; | ||
import java.security.SecureRandom; | ||
import java.util.Collections; | ||
import java.util.Random; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.LinkedBlockingDeque; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import okio.ByteString; | ||
|
||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
||
public final class WebSocketCall { | ||
/** | ||
* Prepares the {@code request} to create a web socket at some point in the future. | ||
*/ | ||
public static WebSocketCall create(OkHttpClient client, Request request) { | ||
return new WebSocketCall(client, request); | ||
} | ||
|
||
private final Call call; | ||
private final Random random; | ||
private final String key; | ||
|
||
WebSocketCall(OkHttpClient client, Request request) { | ||
this(client, request, new SecureRandom()); | ||
} | ||
|
||
WebSocketCall(OkHttpClient client, Request request, Random random) { | ||
this.random = random; | ||
|
||
byte[] nonce = new byte[16]; | ||
random.nextBytes(nonce); | ||
key = ByteString.of(nonce).base64(); | ||
|
||
// Copy the client. Otherwise changes (socket factory, redirect policy, | ||
// etc.) may incorrectly be reflected in the request when it is executed. | ||
client = client.clone(); | ||
// Force HTTP/1.1 until the WebSocket over HTTP/2 version is finalized. | ||
client.setProtocols(Collections.singletonList(com.squareup.okhttp.Protocol.HTTP_1_1)); | ||
|
||
request = request.newBuilder() | ||
.header("Upgrade", "websocket") | ||
.header("Connection", "Upgrade") | ||
.header("Sec-WebSocket-Key", key) | ||
.header("Sec-WebSocket-Version", "13") | ||
.build(); | ||
|
||
call = client.newCall(request); | ||
} | ||
|
||
/** | ||
* Schedules the request to be executed at some point in the future. | ||
* | ||
* <p>The {@link OkHttpClient#getDispatcher dispatcher} defines when the request will run: | ||
* usually immediately unless there are several other requests currently being executed. | ||
* | ||
* <p>This client will later call back {@code responseCallback} with either an HTTP response or a | ||
* failure exception. If you {@link #cancel} a request before it completes the callback will not | ||
* be invoked. | ||
* | ||
* @throws IllegalStateException when the call has already been executed. | ||
*/ | ||
public void enqueue(final WebSocketListener listener) { | ||
Callback responseCallback = new Callback() { | ||
@Override public void onResponse(Response response) throws IOException { | ||
System.out.println(response); | ||
try { | ||
createWebSocket(response, listener); | ||
} catch (IOException e) { | ||
listener.onFailure(e, response); | ||
} | ||
} | ||
|
||
@Override public void onFailure(Request request, IOException e) { | ||
listener.onFailure(e, null); | ||
} | ||
}; | ||
// TODO call.enqueue(responseCallback, true); | ||
Internal.instance.callEnqueue(call, responseCallback, true); | ||
} | ||
|
||
/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */ | ||
public void cancel() { | ||
call.cancel(); | ||
} | ||
|
||
private void createWebSocket(Response response, WebSocketListener listener) throws IOException { | ||
if (response.code() != 101) { | ||
Util.closeQuietly(response.body()); | ||
throw new ProtocolException("Expected HTTP 101 response but was '" | ||
+ response.code() | ||
+ " " | ||
+ response.message() | ||
+ "'"); | ||
} | ||
|
||
String headerConnection = response.header("Connection"); | ||
if (!"Upgrade".equalsIgnoreCase(headerConnection)) { | ||
throw new ProtocolException( | ||
"Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'"); | ||
} | ||
String headerUpgrade = response.header("Upgrade"); | ||
if (!"websocket".equalsIgnoreCase(headerUpgrade)) { | ||
throw new ProtocolException( | ||
"Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'"); | ||
} | ||
String headerAccept = response.header("Sec-WebSocket-Accept"); | ||
String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC); | ||
if (!acceptExpected.equals(headerAccept)) { | ||
throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '" | ||
+ acceptExpected | ||
+ "' but was '" | ||
+ headerAccept | ||
+ "'"); | ||
} | ||
|
||
StreamAllocation streamAllocation = Internal.instance.callEngineGetStreamAllocation(call); | ||
RealWebSocket webSocket = StreamWebSocket.create( | ||
streamAllocation, response, random, listener); | ||
|
||
listener.onOpen(webSocket, response); | ||
|
||
while (webSocket.readMessage()) { | ||
} | ||
} | ||
|
||
// Keep static so that the WebSocketCall instance can be garbage collected. | ||
private static class StreamWebSocket extends RealWebSocket { | ||
static RealWebSocket create(StreamAllocation streamAllocation, Response response, | ||
Random random, WebSocketListener listener) { | ||
String url = response.request().urlString(); | ||
ThreadPoolExecutor replyExecutor = | ||
new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(), | ||
Util.threadFactory(String.format("OkHttp %s WebSocket", url), true)); | ||
replyExecutor.allowCoreThreadTimeOut(true); | ||
|
||
return new StreamWebSocket(streamAllocation, random, replyExecutor, listener, url); | ||
} | ||
|
||
private final StreamAllocation streamAllocation; | ||
private final ExecutorService replyExecutor; | ||
|
||
private StreamWebSocket(StreamAllocation streamAllocation, | ||
Random random, ExecutorService replyExecutor, WebSocketListener listener, String url) { | ||
super(true /* is client */, streamAllocation.connection().source, | ||
streamAllocation.connection().sink, random, replyExecutor, listener, url); | ||
this.streamAllocation = streamAllocation; | ||
this.replyExecutor = replyExecutor; | ||
} | ||
|
||
@Override protected void close() throws IOException { | ||
replyExecutor.shutdown(); | ||
streamAllocation.noNewStreams(); | ||
streamAllocation.streamFinished(streamAllocation.stream()); | ||
} | ||
} | ||
} |
Oops, something went wrong.