Skip to content

Commit

Permalink
Wk/taco 131 proxy handle h2h1 streamid (#211)
Browse files Browse the repository at this point in the history
* TACO-131 h2 message session

* TACO-131 updating http tests for http message session

* TACO-131 http sessions use the same meta object

* TACO-131 h2 interleaved request test
  • Loading branch information
manimaul authored Apr 16, 2018
1 parent 7b254e6 commit 42c9932
Show file tree
Hide file tree
Showing 17 changed files with 406 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public abstract class DefaultSegmentedData implements SegmentedData {

public abstract Headers trailingHeaders();

public abstract int streamId();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder content(ByteBuf content);
Expand All @@ -23,10 +25,14 @@ public abstract static class Builder {

public abstract Builder trailingHeaders(Headers headers);

public abstract Builder streamId(int id);

public abstract DefaultSegmentedData build();
}

public static Builder builder() {
return new AutoValue_DefaultSegmentedData.Builder().trailingHeaders(new DefaultHeaders());
return new AutoValue_DefaultSegmentedData.Builder()
.trailingHeaders(new DefaultHeaders())
.streamId(Message.H1_STREAM_ID_NONE);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xjeffrose.xio.http;

import com.google.common.annotations.VisibleForTesting;
import com.xjeffrose.xio.http.internal.MessageMetaState;
import lombok.extern.slf4j.Slf4j;

// Fun reading!
Expand Down Expand Up @@ -44,25 +45,13 @@ public class Http1MessageSession {
@enduml
*/

public static class RequestMeta {
public final Request request;
public boolean requestFinished;
public boolean responseFinished;

RequestMeta(Request request, boolean requestFinished) {
this.request = request;
this.requestFinished = requestFinished;
responseFinished = false;
}
}

// We can only handle one request at a time, any additional requests will be ignored.
private RequestMeta initialRequest;
private MessageMetaState initialRequest;
// The client tried to send another request before the first request was responded to.
private boolean clientTriedPipeline;

@VisibleForTesting
public RequestMeta initialRequest() {
public MessageMetaState initialRequest() {
return initialRequest;
}

Expand All @@ -83,7 +72,7 @@ public Http1MessageSession() {
public void onRequest(Request request) {
if (initialRequest == null) {
boolean fullRequest = (request instanceof FullRequest);
initialRequest = new RequestMeta(request, fullRequest);
initialRequest = new MessageMetaState(request, fullRequest);
} else {
// log that the client is attempting to pipeline
if (clientTriedPipeline == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,17 @@ Response wrapResponse(ChannelHandlerContext ctx, Http2Response msg) {
if (msg.payload instanceof Http2Headers) {
Http2Headers headers = (Http2Headers) msg.payload;
if (msg.eos && headers.method() == null && headers.status() == null) {
Response response =
new SegmentedResponseData(getChannelResponse(ctx), new Http2SegmentedData(headers));
return response;
return new SegmentedResponseData(
getChannelResponse(ctx), new Http2SegmentedData(headers, msg.streamId));
} else {
Response response = wrapHeaders(headers, msg.streamId, msg.eos);
setChannelResponse(ctx, response);
return response;
}
} else if (msg.payload instanceof Http2DataFrame) {
Response response =
new SegmentedResponseData(
getChannelResponse(ctx),
new Http2SegmentedData(((Http2DataFrame) msg.payload).content(), msg.eos));
return response;
return new SegmentedResponseData(
getChannelResponse(ctx),
new Http2SegmentedData(((Http2DataFrame) msg.payload).content(), msg.eos, msg.streamId));
}
// TODO(CK): throw an exception?
return null;
Expand Down
113 changes: 113 additions & 0 deletions xio-core/src/main/java/com/xjeffrose/xio/http/Http2MessageSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.xjeffrose.xio.http;

import com.google.common.collect.Maps;
import com.xjeffrose.xio.http.internal.MessageMetaState;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

/**
* A finite state machine to track the current HTTP/2 message session. This class exists to store
* the connection specific state of the message session
*/
@Slf4j
public class Http2MessageSession {

private static final AttributeKey<Http2MessageSession> CHANNEL_MESSAGE_SESSION_KEY =
AttributeKey.newInstance("xio_channel_h2_message_session");

static Http2MessageSession contextMessageSession(ChannelHandlerContext ctx) {
Http2MessageSession session = ctx.channel().attr(CHANNEL_MESSAGE_SESSION_KEY).get();
if (session == null) {
session = new Http2MessageSession();
ctx.channel().attr(CHANNEL_MESSAGE_SESSION_KEY).set(session);
}
return session;
}

private Map<Integer, MessageMetaState> streamIdRequests = Maps.newHashMap();

private Http2MessageSession() {}

public void onRequest(Request request) {
MessageMetaState initialRequest = streamIdRequests.get(request.streamId());
if (initialRequest == null) {
if (request.startOfMessage()) {
streamIdRequests.put(
request.streamId(), new MessageMetaState(request, request.endOfMessage()));
} else {
log.error("Received an h2 message segment without a startOfMessage - request: {}", request);
}
} else {
initialRequest.requestFinished = request.endOfMessage();
}
}

public void onRequestData(SegmentedData data) {
MessageMetaState initialRequest = streamIdRequests.get(data.streamId());

if (initialRequest == null) {
log.error(
"Received an h2 message SegmentedData without a current Request, dropping data: {}",
data);
return;
}

if (data.endOfMessage()) {
initialRequest.requestFinished = true;
}
}

public void onResponse(Response response) {
MessageMetaState initialRequest = streamIdRequests.get(response.streamId());
if (initialRequest != null && response.endOfMessage()) {
initialRequest.responseFinished = true;
}
}

/**
* Called before a SegmentedData object is sent to the client as part of a Response.
*
* @param data The SegmentedData object the server is about to send
*/
public void onResponseData(SegmentedData data) {
MessageMetaState initialRequest = streamIdRequests.get(data.streamId());
if (initialRequest != null) {
if (data.endOfMessage()) {
initialRequest.responseFinished = true;
}
} else {
log.error(
"Attempted to write SegmentedData without a current Request, dropping data: {}", data);
}
}

/**
* Returns the Request object for the current session (if any).
*
* @return the current Request or null
*/
@Nullable
public Request currentRequest(int streamId) {
MessageMetaState initialRequest = streamIdRequests.get(streamId);
if (initialRequest != null) {
return initialRequest.request;
}
return null;
}

/**
* Check if the message session has completed, if so remove state and prepare for the next
* session.
*/
public void flush(int streamId) {
MessageMetaState initialRequest = streamIdRequests.get(streamId);
if (initialRequest != null
&& initialRequest.requestFinished
&& initialRequest.responseFinished) {
streamIdRequests.remove(streamId);
}
}
}
10 changes: 5 additions & 5 deletions xio-core/src/main/java/com/xjeffrose/xio/http/Http2Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ public Http2Response(int streamId, T payload, boolean eos) {

public static Http2Response<Http2DataFrame> build(
int streamId, Http2DataFrame data, boolean eos) {
return new Http2Response<Http2DataFrame>(streamId, data, eos);
return new Http2Response<>(streamId, data, eos);
}

public static Http2Response<Http2Headers> build(int streamId, Http2Headers headers) {
return new Http2Response<Http2Headers>(streamId, headers, false);
return new Http2Response<>(streamId, headers, false);
}

public static Http2Response<Http2Headers> build(int streamId, Http2Headers headers, boolean eos) {
return new Http2Response<Http2Headers>(streamId, headers, eos);
return new Http2Response<>(streamId, headers, eos);
}

public Http2Response newStreamId(int newId) {
return new Http2Response(newId, payload, eos);
public Http2Response<T> newStreamId(int newId) {
return new Http2Response<>(newId, payload, eos);
}
}
72 changes: 37 additions & 35 deletions xio-core/src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
package com.xjeffrose.xio.http;

import static com.xjeffrose.xio.http.Http2MessageSession.contextMessageSession;

import com.xjeffrose.xio.core.internal.UnstableApi;
import com.xjeffrose.xio.http.internal.FullHttp2Request;
import com.xjeffrose.xio.http.internal.Http2SegmentedData;
import com.xjeffrose.xio.http.internal.SegmentedHttp2Request;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.PromiseCombiner;

@UnstableApi
public class Http2ServerCodec extends ChannelDuplexHandler {

private static final AttributeKey<Request> CHANNEL_REQUEST_KEY =
AttributeKey.newInstance("xio_channel_h2_request");

private static void setChannelRequest(ChannelHandlerContext ctx, Request request) {
ctx.channel().attr(CHANNEL_REQUEST_KEY).set(request);
}

private static Request getChannelRequest(ChannelHandlerContext ctx) {
// TODO(CK): Deal with null?
return ctx.channel().attr(CHANNEL_REQUEST_KEY).get();
}

Request wrapHeaders(Http2Headers headers, int streamId, boolean eos) {
if (eos) {
return new FullHttp2Request(headers, streamId);
Expand All @@ -38,23 +28,33 @@ Request wrapHeaders(Http2Headers headers, int streamId, boolean eos) {
}

Request wrapRequest(ChannelHandlerContext ctx, Http2Request msg) {
Http2MessageSession messageSession = contextMessageSession(ctx);
if (msg.payload instanceof Http2Headers) {
Http2Headers headers = (Http2Headers) msg.payload;
if (msg.eos && headers.method() == null && headers.status() == null) {
Request request =
new SegmentedRequestData(getChannelRequest(ctx), new Http2SegmentedData(headers));
return request;
Request initialRequest = messageSession.currentRequest(msg.streamId);
if (initialRequest != null) {
SegmentedRequestData request =
new SegmentedRequestData(
initialRequest, new Http2SegmentedData(headers, msg.streamId));
messageSession.onRequest(request);
return request;
}
} else {
Request request = wrapHeaders(headers, msg.streamId, msg.eos);
setChannelRequest(ctx, request);
messageSession.onRequest(request);
return request;
}
} else if (msg.payload instanceof Http2DataFrame) {
Request request =
new SegmentedRequestData(
getChannelRequest(ctx),
new Http2SegmentedData(((Http2DataFrame) msg.payload).content(), msg.eos));
return request;
Http2DataFrame frame = (Http2DataFrame) msg.payload;
Request initialRequest = messageSession.currentRequest(msg.streamId);
if (initialRequest != null) {
SegmentedRequestData data =
new SegmentedRequestData(
initialRequest, new Http2SegmentedData(frame.content(), msg.eos, msg.streamId));
messageSession.onRequestData(data);
return data;
}
}
// TODO(CK): throw an exception?
return null;
Expand All @@ -63,7 +63,8 @@ Request wrapRequest(ChannelHandlerContext ctx, Http2Request msg) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2Request) {
ctx.fireChannelRead(wrapRequest(ctx, (Http2Request) msg));
Http2Request request = (Http2Request) msg;
ctx.fireChannelRead(wrapRequest(ctx, request));
} else {
ctx.fireChannelRead(msg);
}
Expand All @@ -74,18 +75,19 @@ void writeResponse(ChannelHandlerContext ctx, Response response, ChannelPromise
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
}

Request request = getChannelRequest(ctx);
int streamId = request.streamId();
Http2MessageSession messageSession = contextMessageSession(ctx);
int streamId = response.streamId();
Http2Headers headers = response.headers().http2Headers();

headers.status(response.status().codeAsText());

if (response instanceof FullResponse) {
setChannelRequest(ctx, null);
if (response.body().readableBytes() > 0) {
messageSession.onResponse(response);
ByteBuf body = response.body();
if (body != null && body.readableBytes() > 0) {
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(Http2Response.build(streamId, headers, false), ctx.newPromise()));
Http2DataFrame data = new DefaultHttp2DataFrame(response.body(), true);
Http2DataFrame data = new DefaultHttp2DataFrame(body, true);
combiner.add(ctx.write(Http2Response.build(streamId, data, true), ctx.newPromise()));
combiner.finish(promise);
} else {
Expand All @@ -94,22 +96,22 @@ void writeResponse(ChannelHandlerContext ctx, Response response, ChannelPromise
} else {
ctx.write(Http2Response.build(streamId, headers, false), promise);
}

messageSession.flush(streamId);
}

void writeContent(ChannelHandlerContext ctx, SegmentedData data, ChannelPromise promise) {
Request request = getChannelRequest(ctx);
int streamId = request.streamId();
if (data.endOfMessage()) {
setChannelRequest(ctx, null);
}
Http2MessageSession messageSession = contextMessageSession(ctx);
messageSession.onResponseData(data);

boolean dataEos = data.endOfMessage() && data.trailingHeaders().size() == 0;
Http2Response response =
Http2Response.build(streamId, new DefaultHttp2DataFrame(data.content(), dataEos), dataEos);
Http2Response.build(
data.streamId(), new DefaultHttp2DataFrame(data.content(), dataEos), dataEos);

if (data.trailingHeaders().size() != 0) {
Http2Headers headers = data.trailingHeaders().http2Headers();
Http2Response last = Http2Response.build(streamId, headers, true);
Http2Response last = Http2Response.build(data.streamId(), headers, true);
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(response, ctx.newPromise()));
combiner.add(ctx.write(last, ctx.newPromise()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Request buildRequest(Request request, String proxyHost, String path) {
.body(request.body())
.method(request.method())
.path(path)
.streamId(request.streamId())
.headers(request.headers())
.httpTraceInfo(request.httpTraceInfo())
.host(proxyHost)
Expand All @@ -76,6 +77,7 @@ public Request buildRequest(Request request, String proxyHost, String path) {
DefaultSegmentedRequest.builder()
.method(request.method())
.path(path)
.streamId(request.streamId())
.headers(request.headers())
.httpTraceInfo(request.httpTraceInfo())
.host(proxyHost)
Expand Down
Loading

0 comments on commit 42c9932

Please sign in to comment.