Skip to content

Commit

Permalink
Web socket support.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendanburns committed Jun 2, 2017
1 parent 8fe8403 commit 5ccc832
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.kubernetes.client.examples;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.WebSockets;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;

public class WebSocketsExample {
public static void main(String... args) throws ApiException, IOException {
final ApiClient client = Config.defaultClient();
WebSockets.stream(args[0], "POST", client, new WebSockets.SocketListener() {
public void open() {}
public void close() {
// Trigger shutdown of the dispatcher's executor so this process can exit cleanly.
client.getHttpClient().getDispatcher().getExecutorService().shutdown();
}
public void bytesMessage(InputStream is) {}
public void textMessage(Reader in) {
try {
BufferedReader reader = new BufferedReader(in);
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
System.out.println(line);
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
}
}
5 changes: 3 additions & 2 deletions kubernetes/.swagger-codegen-ignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.gitignore
git_push.sh
# Remove this once swagger-codegen 2.2.3 is released and we update.
# We want https://github.com/swagger-api/swagger-codegen/pull/5629
# in the release.
# Verify the following PRs are in the release:
# * https://github.com/swagger-api/swagger-codegen/pull/5629
# * https://github.com/swagger-api/swagger-codegen/pull/5648
src/main/java/io/kubernetes/client/ApiClient.java

10 changes: 8 additions & 2 deletions kubernetes/src/main/java/io/kubernetes/client/ApiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1069,9 +1069,16 @@ public <T> T handleResponse(Response response, Type returnType) throws ApiExcept
* @throws ApiException If fail to serialize the request body object
*/
public Call buildCall(String path, String method, List<Pair> queryParams, Object body, Map<String, String> headerParams, Map<String, Object> formParams, String[] authNames, ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
Request request = buildRequest(path, method, queryParams, body, headerParams, formParams, authNames, progressRequestListener);

return httpClient.newCall(request);
}

public Request buildRequest(String path, String method, List<Pair> queryParams, Object body, Map<String, String> headerParams, Map<String, Object> formParams, String[] authNames, ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
updateParamsForAuth(authNames, queryParams, headerParams);

final String url = buildUrl(path, queryParams);
System.out.println(url);
final Request.Builder reqBuilder = new Request.Builder().url(url);
processHeaderParams(headerParams, reqBuilder);

Expand Down Expand Up @@ -1108,8 +1115,7 @@ public Call buildCall(String path, String method, List<Pair> queryParams, Object
} else {
request = reqBuilder.method(method, reqBody).build();
}

return httpClient.newCall(request);
return request;
}

/**
Expand Down
11 changes: 10 additions & 1 deletion util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp-ws</artifactId>
<version>2.7.5</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.stefanbirkner/system-rules -->
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
Expand All @@ -51,4 +55,9 @@
</plugin>
</plugins>
</build>
<properties>
<java.version>1.7</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
</project>
2 changes: 1 addition & 1 deletion util/src/main/java/io/kubernetes/client/util/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,4 @@ public static ApiClient defaultClient() throws IOException {
client.setBasePath("http://localhost:8080");
return client;
}
}
}
188 changes: 188 additions & 0 deletions util/src/main/java/io/kubernetes/client/util/WebSocketCall.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This has been cloned from the okhttp-ws package so that I could remove
// the requirement that websockets use the "GET" method
package io.kubernetes.client.util;

import com.squareup.okhttp.Call;
import com.squareup.okhttp.Callback;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.internal.Internal;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.http.StreamAllocation;
import com.squareup.okhttp.internal.ws.RealWebSocket;
import com.squareup.okhttp.internal.ws.WebSocketProtocol;
import com.squareup.okhttp.ws.WebSocketListener;
import java.io.IOException;
import java.net.ProtocolException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import okio.ByteString;

import static java.util.concurrent.TimeUnit.SECONDS;

public final class WebSocketCall {
/**
* Prepares the {@code request} to create a web socket at some point in the future.
*/
public static WebSocketCall create(OkHttpClient client, Request request) {
return new WebSocketCall(client, request);
}

private final Call call;
private final Random random;
private final String key;

WebSocketCall(OkHttpClient client, Request request) {
this(client, request, new SecureRandom());
}

WebSocketCall(OkHttpClient client, Request request, Random random) {
this.random = random;

byte[] nonce = new byte[16];
random.nextBytes(nonce);
key = ByteString.of(nonce).base64();

// Copy the client. Otherwise changes (socket factory, redirect policy,
// etc.) may incorrectly be reflected in the request when it is executed.
client = client.clone();
// Force HTTP/1.1 until the WebSocket over HTTP/2 version is finalized.
client.setProtocols(Collections.singletonList(com.squareup.okhttp.Protocol.HTTP_1_1));

request = request.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();

call = client.newCall(request);
}

/**
* Schedules the request to be executed at some point in the future.
*
* <p>The {@link OkHttpClient#getDispatcher dispatcher} defines when the request will run:
* usually immediately unless there are several other requests currently being executed.
*
* <p>This client will later call back {@code responseCallback} with either an HTTP response or a
* failure exception. If you {@link #cancel} a request before it completes the callback will not
* be invoked.
*
* @throws IllegalStateException when the call has already been executed.
*/
public void enqueue(final WebSocketListener listener) {
Callback responseCallback = new Callback() {
@Override public void onResponse(Response response) throws IOException {
System.out.println(response);
try {
createWebSocket(response, listener);
} catch (IOException e) {
listener.onFailure(e, response);
}
}

@Override public void onFailure(Request request, IOException e) {
listener.onFailure(e, null);
}
};
// TODO call.enqueue(responseCallback, true);
Internal.instance.callEnqueue(call, responseCallback, true);
}

/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
public void cancel() {
call.cancel();
}

private void createWebSocket(Response response, WebSocketListener listener) throws IOException {
if (response.code() != 101) {
Util.closeQuietly(response.body());
throw new ProtocolException("Expected HTTP 101 response but was '"
+ response.code()
+ " "
+ response.message()
+ "'");
}

String headerConnection = response.header("Connection");
if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
throw new ProtocolException(
"Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'");
}
String headerUpgrade = response.header("Upgrade");
if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
throw new ProtocolException(
"Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
}
String headerAccept = response.header("Sec-WebSocket-Accept");
String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC);
if (!acceptExpected.equals(headerAccept)) {
throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
+ acceptExpected
+ "' but was '"
+ headerAccept
+ "'");
}

StreamAllocation streamAllocation = Internal.instance.callEngineGetStreamAllocation(call);
RealWebSocket webSocket = StreamWebSocket.create(
streamAllocation, response, random, listener);

listener.onOpen(webSocket, response);

while (webSocket.readMessage()) {
}
}

// Keep static so that the WebSocketCall instance can be garbage collected.
private static class StreamWebSocket extends RealWebSocket {
static RealWebSocket create(StreamAllocation streamAllocation, Response response,
Random random, WebSocketListener listener) {
String url = response.request().urlString();
ThreadPoolExecutor replyExecutor =
new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(),
Util.threadFactory(String.format("OkHttp %s WebSocket", url), true));
replyExecutor.allowCoreThreadTimeOut(true);

return new StreamWebSocket(streamAllocation, random, replyExecutor, listener, url);
}

private final StreamAllocation streamAllocation;
private final ExecutorService replyExecutor;

private StreamWebSocket(StreamAllocation streamAllocation,
Random random, ExecutorService replyExecutor, WebSocketListener listener, String url) {
super(true /* is client */, streamAllocation.connection().source,
streamAllocation.connection().sink, random, replyExecutor, listener, url);
this.streamAllocation = streamAllocation;
this.replyExecutor = replyExecutor;
}

@Override protected void close() throws IOException {
replyExecutor.shutdown();
streamAllocation.noNewStreams();
streamAllocation.streamFinished(streamAllocation.stream());
}
}
}

0 comments on commit 5ccc832

Please sign in to comment.