Skip to content

Commit

Permalink
Wk/taco 131 just some names (#207)
Browse files Browse the repository at this point in the history
* TACO-131 trying to clarify h1 h2 message property names

* TACO-131 - consolidating h1h2 message types

* TACO-131 - PR feedback - fix some javadoc mistakes

* TACO-131 - segmented it is Chris
  • Loading branch information
manimaul authored Apr 11, 2018
1 parent 6a0b629 commit 72fa94e
Show file tree
Hide file tree
Showing 45 changed files with 381 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
public abstract class DefaultFullRequest implements FullRequest {

@Override
public boolean startOfStream() {
public boolean startOfMessage() {
return true;
}

@Override
public boolean endOfMessage() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
@ToString
public abstract class DefaultFullResponse implements FullResponse {

@Override
public boolean endOfMessage() {
return true;
}

@Override
public boolean startOfMessage() {
return true;
}

public abstract ByteBuf body();

public abstract HttpResponseStatus status();
Expand All @@ -23,8 +33,6 @@ public abstract class DefaultFullResponse implements FullResponse {

public abstract TraceInfo httpTraceInfo();

public abstract boolean endOfStream();

/** Not intended to be called. */
@Override
public String version() {
Expand All @@ -47,12 +55,9 @@ public DefaultFullResponse build() {
if (!httpTraceInfo().isPresent()) {
httpTraceInfo(new TraceInfo(headers()));
}
endOfStream(true);
return autoBuild();
}

abstract Builder endOfStream(boolean endOfStream);

abstract Optional<TraceInfo> httpTraceInfo();

abstract DefaultFullResponse autoBuild();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@
@UnstableApi
@AutoValue
@ToString
public abstract class DefaultStreamingData implements StreamingData {
public abstract class DefaultSegmentedData implements SegmentedData {
public abstract ByteBuf content();

public abstract boolean endOfStream();
public abstract boolean endOfMessage();

public abstract Headers trailingHeaders();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder content(ByteBuf content);

public abstract Builder endOfStream(boolean endOfStream);
public abstract Builder endOfMessage(boolean isEnd);

public abstract Builder trailingHeaders(Headers headers);

public abstract DefaultStreamingData build();
public abstract DefaultSegmentedData build();
}

public static Builder builder() {
return new AutoValue_DefaultStreamingData.Builder().trailingHeaders(new DefaultHeaders());
return new AutoValue_DefaultSegmentedData.Builder().trailingHeaders(new DefaultHeaders());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
import java.util.Optional;
import lombok.ToString;

/** Value class for representing a streaming outgoing HTTP1/2 Request, for use in a client. */
/** Value class for representing a segmented outgoing HTTP1/2 Request, for use in a client. */
@UnstableApi
@AutoValue
@ToString
public abstract class DefaultStreamingRequest implements StreamingRequest, Traceable {
public abstract class DefaultSegmentedRequest implements SegmentedRequest {

@Override
public boolean startOfStream() {
public boolean startOfMessage() {
return true;
}

@Override
public boolean endOfMessage() {
return false;
}

public abstract HttpMethod method();

public abstract String path();
Expand Down Expand Up @@ -64,9 +69,9 @@ public Builder host(String host) {

abstract Optional<TraceInfo> httpTraceInfo();

abstract DefaultStreamingRequest autoBuild();
abstract DefaultSegmentedRequest autoBuild();

public DefaultStreamingRequest build() {
public DefaultSegmentedRequest build() {
if (!httpTraceInfo().isPresent() && headers().isPresent()) {
httpTraceInfo(new TraceInfo(headers().get()));
}
Expand All @@ -75,6 +80,6 @@ public DefaultStreamingRequest build() {
}

public static Builder builder() {
return new AutoValue_DefaultStreamingRequest.Builder().streamId(-1);
return new AutoValue_DefaultSegmentedRequest.Builder().streamId(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,27 @@
import java.util.Optional;
import lombok.ToString;

/** Value class for representing a streaming outgoing HTTP1/2 Response, for use in a server. */
/** Value class for representing a segmented outgoing HTTP1/2 Response, for use in a server. */
@UnstableApi
@AutoValue
@ToString
public abstract class DefaultStreamingResponse implements StreamingResponse {
public abstract class DefaultSegmentedResponse implements SegmentedResponse {

public abstract HttpResponseStatus status();

public abstract Headers headers();

public abstract TraceInfo httpTraceInfo();

public abstract boolean endOfStream();
@Override
public boolean startOfMessage() {
return true;
}

@Override
public boolean endOfMessage() {
return false;
}

/** Not intended to be called. */
@Override
Expand All @@ -34,24 +42,21 @@ public abstract static class Builder {

public abstract Builder httpTraceInfo(TraceInfo traceInfo);

public DefaultStreamingResponse build() {
public DefaultSegmentedResponse build() {
if (!httpTraceInfo().isPresent()) {
httpTraceInfo(new TraceInfo(headers()));
}
endOfStream(true);
return autoBuild();
}

abstract Headers headers();

abstract Builder endOfStream(boolean endOfStream);

abstract Optional<TraceInfo> httpTraceInfo();

abstract DefaultStreamingResponse autoBuild();
abstract DefaultSegmentedResponse autoBuild();
}

public static Builder builder() {
return new AutoValue_DefaultStreamingResponse.Builder();
return new AutoValue_DefaultSegmentedResponse.Builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.xjeffrose.xio.core.internal.UnstableApi;
import com.xjeffrose.xio.http.internal.FullHttp1Response;
import com.xjeffrose.xio.http.internal.Http1Response;
import com.xjeffrose.xio.http.internal.Http1StreamingData;
import com.xjeffrose.xio.http.internal.Http1SegmentedData;
import com.xjeffrose.xio.http.internal.SegmentedHttp1Response;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
Expand Down Expand Up @@ -49,13 +49,13 @@ Response wrapResponse(ChannelHandlerContext ctx, HttpObject msg) {
setChannelResponse(ctx, response);
return response;
} else if (msg instanceof HttpResponse) {
Response response = new Http1Response((HttpResponse) msg);
Response response = new SegmentedHttp1Response((HttpResponse) msg);
setChannelResponse(ctx, response);
return response;
} else if (msg instanceof HttpContent) {
Response response =
new StreamingResponseData(
getChannelResponse(ctx), new Http1StreamingData((HttpContent) msg));
new SegmentedResponseData(
getChannelResponse(ctx), new Http1SegmentedData((HttpContent) msg));
return response;
}
// TODO(CK): throw an exception?
Expand Down Expand Up @@ -113,8 +113,8 @@ HttpRequest buildRequest(ChannelHandlerContext ctx, Request request) {
}
}

HttpContent buildContent(ChannelHandlerContext ctx, StreamingData data) {
if (data.endOfStream()) {
HttpContent buildContent(ChannelHandlerContext ctx, SegmentedData data) {
if (data.endOfMessage()) {
LastHttpContent last = new DefaultLastHttpContent(data.content());
if (data.trailingHeaders() != null) {
last.trailingHeaders().add(data.trailingHeaders().http1Headers(true, true));
Expand All @@ -130,8 +130,8 @@ HttpContent buildContent(ChannelHandlerContext ctx, StreamingData data) {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("write: msg={}", msg);
if (msg instanceof StreamingData) {
ctx.write(buildContent(ctx, (StreamingData) msg), promise);
if (msg instanceof SegmentedData) {
ctx.write(buildContent(ctx, (SegmentedData) msg), promise);
} else if (msg instanceof Request) {
log.debug("writing request {}", msg);
ctx.write(buildRequest(ctx, (Request) msg), promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ public void onRequest(Request request) {
*
* @param data The StreamingData object that the client has sent
*/
public void onRequestData(StreamingData data) {
public void onRequestData(SegmentedData data) {
if (initialRequest == null) {
log.error("Received StreamingData without a current Request, dropping data: {}", data);
return;
}

if (data.endOfStream()) {
if (data.endOfMessage()) {
initialRequest.requestFinished = true;
}
}
Expand Down Expand Up @@ -141,9 +141,9 @@ public void onResponse(Response response) {
*
* @param data The StreamingData object the server is about to send
*/
public void onResponseData(StreamingData data) {
public void onResponseData(SegmentedData data) {
if (initialRequest != null) {
if (data.endOfStream()) {
if (data.endOfMessage()) {
initialRequest.responseFinished = true;
}
} else {
Expand Down
26 changes: 13 additions & 13 deletions xio-core/src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.xjeffrose.xio.core.internal.UnstableApi;
import com.xjeffrose.xio.http.internal.FullHttp1Request;
import com.xjeffrose.xio.http.internal.Http1Request;
import com.xjeffrose.xio.http.internal.Http1StreamingData;
import com.xjeffrose.xio.http.internal.Http1SegmentedData;
import com.xjeffrose.xio.http.internal.SegmentedHttp1Request;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
Expand Down Expand Up @@ -52,18 +52,18 @@ private void wrapRequest(ChannelHandlerContext ctx, HttpObject msg) {
request = new FullHttp1Request((FullHttpRequest) msg);
session.onRequest(request);
} else if (msg instanceof HttpRequest) {
request = new Http1Request((HttpRequest) msg);
request = new SegmentedHttp1Request((HttpRequest) msg);
session.onRequest(request);
} else if (msg instanceof HttpContent) {
StreamingData data = new Http1StreamingData((HttpContent) msg);
SegmentedData data = new Http1SegmentedData((HttpContent) msg);
session.onRequestData(data);
Request sessionRequest = session.currentRequest();
if (sessionRequest == null) {
// We don't have a sessionRequest so we can't construct a StreamingRequestData.
// We don't have a sessionRequest so we can't construct a SegmentedRequestData.
// Don't log as session.onRequestData should have logged.
return;
}
request = new StreamingRequestData(sessionRequest, data);
request = new SegmentedRequestData(sessionRequest, data);
} else {
log.error("Dropping unsupported http object: {}", msg);
return;
Expand Down Expand Up @@ -157,15 +157,15 @@ private void buildResponse(ChannelHandlerContext ctx, Response response, Channel
}

/**
* Translate the StreamingData object into a netty HttpContent and fire write on the next handler.
* Translate the SegmentedData object into a netty HttpContent and fire write on the next handler.
*/
private void buildContent(ChannelHandlerContext ctx, StreamingData data, ChannelPromise promise) {
private void buildContent(ChannelHandlerContext ctx, SegmentedData data, ChannelPromise promise) {
Http1MessageSession session = setDefaultMessageSession(ctx);
try {
session.onResponseData(data);
HttpObject obj;

if (data.endOfStream()) {
if (data.endOfMessage()) {
LastHttpContent last = new DefaultLastHttpContent(data.content());
if (data.trailingHeaders() != null) {
last.trailingHeaders().add(data.trailingHeaders().http1Headers(true, false));
Expand All @@ -176,7 +176,7 @@ private void buildContent(ChannelHandlerContext ctx, StreamingData data, Channel
}

ChannelFuture future = ctx.write(obj, promise);
if (session.closeConnection() && data.endOfStream()) {
if (session.closeConnection() && data.endOfMessage()) {
future.addListener(ChannelFutureListener.CLOSE);
}
} finally {
Expand All @@ -185,14 +185,14 @@ private void buildContent(ChannelHandlerContext ctx, StreamingData data, Channel
}

/**
* Handles instances of StreamingData and Response, all other types are forwarded to the next
* Handles instances of SegmentedData and Response, all other types are forwarded to the next
* handler.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof StreamingData) {
buildContent(ctx, (StreamingData) msg, promise);
if (msg instanceof SegmentedData) {
buildContent(ctx, (SegmentedData) msg, promise);
} else if (msg instanceof Response) {
buildResponse(ctx, (Response) msg, promise);
} else {
Expand Down
Loading

0 comments on commit 72fa94e

Please sign in to comment.