Skip to content

Commit

Permalink
TACO-131 - propagation of streamid for proxied requests (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
manimaul authored Apr 16, 2018
1 parent 72fa94e commit 7b254e6
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ public boolean startOfMessage() {
return true;
}

@Override
public abstract ByteBuf body();

@Override
public abstract HttpResponseStatus status();

@Override
public abstract Headers headers();

@Override
public abstract TraceInfo httpTraceInfo();

@Override
public abstract int streamId();

/** Not intended to be called. */
@Override
public String version() {
Expand All @@ -49,7 +56,7 @@ public abstract static class Builder {

public abstract Builder httpTraceInfo(TraceInfo span);

abstract Headers headers();
public abstract Builder streamId(int streamId);

public DefaultFullResponse build() {
if (!httpTraceInfo().isPresent()) {
Expand All @@ -58,12 +65,24 @@ public DefaultFullResponse build() {
return autoBuild();
}

abstract Headers headers();

abstract Optional<TraceInfo> httpTraceInfo();

abstract DefaultFullResponse autoBuild();
}

public static Builder from(FullResponse other) {
return builder()
.body(other.body())
.headers(other.headers())
.httpTraceInfo(other.httpTraceInfo())
.streamId(other.streamId())
.status(other.status())
.body(other.body());
}

public static Builder builder() {
return new AutoValue_DefaultFullResponse.Builder();
return new AutoValue_DefaultFullResponse.Builder().streamId(Message.H1_STREAM_ID_NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
@ToString
public abstract class DefaultSegmentedResponse implements SegmentedResponse {

@Override
public abstract HttpResponseStatus status();

@Override
public abstract Headers headers();

@Override
public abstract TraceInfo httpTraceInfo();

@Override
Expand All @@ -28,6 +31,9 @@ public boolean endOfMessage() {
return false;
}

@Override
public abstract int streamId();

/** Not intended to be called. */
@Override
public String version() {
Expand All @@ -42,6 +48,8 @@ public abstract static class Builder {

public abstract Builder httpTraceInfo(TraceInfo traceInfo);

public abstract Builder streamId(int streamId);

public DefaultSegmentedResponse build() {
if (!httpTraceInfo().isPresent()) {
httpTraceInfo(new TraceInfo(headers()));
Expand All @@ -56,7 +64,15 @@ public DefaultSegmentedResponse build() {
abstract DefaultSegmentedResponse autoBuild();
}

public static Builder from(SegmentedResponse other) {
return builder()
.status(other.status())
.headers(other.headers())
.httpTraceInfo(other.httpTraceInfo())
.streamId(other.streamId());
}

public static Builder builder() {
return new AutoValue_DefaultSegmentedResponse.Builder();
return new AutoValue_DefaultSegmentedResponse.Builder().streamId(Message.H1_STREAM_ID_NONE);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
package com.xjeffrose.xio.http;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ProxyBackendHandler extends ChannelInboundHandlerAdapter {
public class ProxyBackendHandler extends ChannelDuplexHandler {

private static AttributeKey<Integer> CHANNEL_STREAM_ID_KEY =
AttributeKey.newInstance("xio_channel_stream_id");

private static void setChannelStreamId(ChannelHandlerContext ctx, int streamId) {
ctx.channel().attr(CHANNEL_STREAM_ID_KEY).set(streamId);
}

@Nullable
private static Integer getChannelStreamId(ChannelHandlerContext ctx) {
return ctx.channel().attr(CHANNEL_STREAM_ID_KEY).get();
}

private final ChannelHandlerContext frontend;
private boolean needFlush = false;
Expand All @@ -25,21 +39,31 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof Request) {
setChannelStreamId(ctx, ((Request) msg).streamId());
}
ctx.write(msg, promise);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("RawBackendHandler[{}] channelRead: {}", this, msg);
if (msg instanceof Response) {
msg = responseWithPreservedStreamId(ctx, (Response) msg);
}
frontend
.write(msg)
.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (f.cause() != null) {
// TODO(CK): move this into a logger class
log.error("Write Error!", f.cause());
}
}
});
(ChannelFutureListener)
f -> {
if (f.cause() != null) {
// TODO(CK): move this into a logger class
log.error("Write Error!", f.cause());
}
});
needFlush = true;
}

Expand All @@ -66,4 +90,19 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
log.debug("RawBackendHandler[{}] exceptionCaught: {}", this, cause);
ctx.close();
}

@Nullable
public Response responseWithPreservedStreamId(ChannelHandlerContext ctx, Response response) {
Integer streamId = getChannelStreamId(ctx);
if (streamId != null && streamId != Message.H1_STREAM_ID_NONE) {
if (response instanceof FullResponse) {
return DefaultFullResponse.from((FullResponse) response).streamId(streamId).build();
} else if (response instanceof SegmentedResponse) {
return DefaultSegmentedResponse.from((SegmentedResponse) response)
.streamId(streamId)
.build();
}
}
return response;
}
}

0 comments on commit 7b254e6

Please sign in to comment.