From 16b5b8905ec003d289e7cb2c829d0d3ced5bb961 Mon Sep 17 00:00:00 2001 From: guohao Date: Mon, 28 Dec 2020 19:04:14 +0800 Subject: [PATCH] Add grpc error code --- .../remoting/netty4/Http2WireProtocol.java | 8 +- dubbo-rpc/dubbo-rpc-triple/pom.xml | 4 - .../dubbo/rpc/protocol/tri/GrpcElf.java | 148 ------------------ .../dubbo/rpc/protocol/tri/GrpcStatus.java | 17 ++ .../dubbo/rpc/protocol/tri/Http2Request.java | 89 +++++------ .../dubbo/rpc/protocol/tri/Marshaller.java | 11 +- .../rpc/protocol/tri/TripleConstant.java | 11 ++ .../tri/TripleHttp2FrameListener.java | 102 ++++++------ .../rpc/protocol/tri/TripleHttp2Protocol.java | 9 +- .../protocol/tri/TripleInvokerResolver.java | 2 + .../dubbo/rpc/protocol/tri/TripleUtil.java | 15 ++ 11 files changed, 153 insertions(+), 263 deletions(-) delete mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcElf.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/Http2WireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/Http2WireProtocol.java index a5130186b04..73acf1bff95 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/Http2WireProtocol.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/netty4/Http2WireProtocol.java @@ -1,21 +1,17 @@ package org.apache.dubbo.remoting.netty4; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2ConnectionHandler; -import io.netty.handler.codec.http2.Http2FrameListener; import io.netty.handler.codec.http2.Http2FrameLogger; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import static io.netty.handler.logging.LogLevel.DEBUG; +import static io.netty.handler.logging.LogLevel.INFO; public abstract class Http2WireProtocol implements WireProtocol { public static final Http2FrameLogger CLIENT_LOGGER = new Http2FrameLogger(DEBUG, "H2_CLIENT"); - public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(DEBUG, "H2_SERVER"); + public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(INFO, "H2_SERVER"); private static final Set handlers = ConcurrentHashMap.newKeySet(); private final ProtocolDetector detector = new Http2ProtocolDetector(); diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml b/dubbo-rpc/dubbo-rpc-triple/pom.xml index 0777dd118ec..883941e0d12 100644 --- a/dubbo-rpc/dubbo-rpc-triple/pom.xml +++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml @@ -35,10 +35,6 @@ dubbo-rpc-api ${project.parent.version} - - io.grpc - grpc-stub - com.google.protobuf protobuf-java diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcElf.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcElf.java deleted file mode 100644 index 059277842c2..00000000000 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcElf.java +++ /dev/null @@ -1,148 +0,0 @@ -package org.apache.dubbo.rpc.protocol.tri; - -import io.netty.util.AsciiString; - -public class GrpcElf { - public static final AsciiString GRPC_STATUS = AsciiString.cached("grpc-status"); - public static final AsciiString GRPC_MESSAGE = AsciiString.cached("grpc-message"); - public static final AsciiString GRPC_ENCODING = AsciiString.cached("grpc-encoding"); - public static final AsciiString GRPC_TIMEOUT = AsciiString.cached("grpc-timeout"); - public static final AsciiString GRPC_ACCEPT_ENCODING = AsciiString.cached("grpc-accept-encoding"); - - public static final AsciiString APPLICATION_GRPC = AsciiString.cached("application/grpc"); - public static final AsciiString TEXT_MIME = AsciiString.cached("text/plain; encoding=utf-8"); - public static final AsciiString GRPC_JSON = AsciiString.cached("application/grpc+json"); - public static final AsciiString GRPC_PROTO = AsciiString.cached("application/grpc+proto"); - - - /** - * Indicates whether or not the given value is a valid gRPC content-type. - */ - public static boolean isGrpcContentType(CharSequence contentType) { - if (contentType == null) { - return false; - } - - if (APPLICATION_GRPC.length() > contentType.length()) { - return false; - } - - if (APPLICATION_GRPC.contentEquals(contentType)) { - return true; - } - if (!AsciiString.of(contentType).startsWith(APPLICATION_GRPC)) { - // Not a gRPC content-type. - return false; - } - - if (contentType.length() == APPLICATION_GRPC.length()) { - // The strings match exactly. - return true; - } - - // The contentType matches, but is longer than the expected string. - // We need to support variations on the content-type (e.g. +proto, +json) as defined by the - // gRPC wire spec. - char nextChar = contentType.charAt(APPLICATION_GRPC.length()); - return nextChar == '+' || nextChar == ';'; - } - - /** - * All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames. - */ - //public enum Http2Error { - // /** - // * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with - // * {@code NO_ERROR}. In this case it is important to indicate to the application that the - // * request should be retried (i.e. {@link Status#UNAVAILABLE}). - // */ - // NO_ERROR(0x0, Status.UNAVAILABLE), - // PROTOCOL_ERROR(0x1, Status.INTERNAL), - // INTERNAL_ERROR(0x2, Status.INTERNAL), - // FLOW_CONTROL_ERROR(0x3, Status.INTERNAL), - // SETTINGS_TIMEOUT(0x4, Status.INTERNAL), - // STREAM_CLOSED(0x5, Status.INTERNAL), - // FRAME_SIZE_ERROR(0x6, Status.INTERNAL), - // REFUSED_STREAM(0x7, Status.UNAVAILABLE), - // CANCEL(0x8, Status.CANCELLED), - // COMPRESSION_ERROR(0x9, Status.INTERNAL), - // CONNECT_ERROR(0xA, Status.INTERNAL), - // ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")), - // INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as " - // + "protocol is not secure enough to call")), - // HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN); - // - // // Populate a mapping of code to enum value for quick look-up. - // private static final Http2Error[] codeMap = buildHttp2CodeMap(); - // private final int code; - // // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true - // // when there are exceptions in the Status, which is not true here. - // @SuppressWarnings("ImmutableEnumChecker") - // private final Status status; - // - // Http2Error(int code, Status status) { - // this.code = code; - // this.status = status.augmentDescription("HTTP/2 error code: " + this.name()); - // } - // - // private static Http2Error[] buildHttp2CodeMap() { - // Http2Error[] errors = Http2Error.values(); - // int size = (int) errors[errors.length - 1].code() + 1; - // Http2Error[] http2CodeMap = new Http2Error[size]; - // for (Http2Error error : errors) { - // int index = (int) error.code(); - // http2CodeMap[index] = error; - // } - // return http2CodeMap; - // } - // - // /** - // * Looks up the HTTP/2 error code enum value for the specified code. - // * - // * @param code an HTTP/2 error code value. - // * @return the HTTP/2 error code enum or {@code null} if not found. - // */ - // public static Http2Error forCode(long code) { - // if (code >= codeMap.length || code < 0) { - // return null; - // } - // return codeMap[(int) code]; - // } - // - // /** - // * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code - // * forCode(code).status()}, to more easily conform to HTTP/2: - // * - // *
Unknown or unsupported error codes MUST NOT trigger any special behavior. - // * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.
- // * - // * @param code the HTTP/2 error code. - // * @return a {@link Status} representing the given error. - // */ - // public static Status statusForCode(long code) { - // Http2Error error = forCode(code); - // if (error == null) { - // // This "forgets" the message of INTERNAL_ERROR while keeping the same status code. - // Status.Code statusCode = INTERNAL_ERROR.status().getCode(); - // return Status.fromCodeValue(statusCode.value()) - // .withDescription("Unrecognized HTTP/2 error code: " + code); - // } - // - // return error.status(); - // } - // - // /** - // * Gets the code for this error used on the wire. - // */ - // public long code() { - // return code; - // } - // - // /** - // * Gets the {@link Status} associated with this HTTP/2 code. - // */ - // public Status status() { - // return status; - // } - //} -} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java new file mode 100644 index 00000000000..3edd8b7b6d8 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java @@ -0,0 +1,17 @@ +package org.apache.dubbo.rpc.protocol.tri; + +/** + * See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + */ +enum GrpcStatus { + OK(0), + NOT_FOUND(5), + UNIMPLEMENTED(12), + INTERNAL(13); + + final int code; + + GrpcStatus(int code){ + this.code=code; + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2Request.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2Request.java index 4b547fbbdf8..89083d7bfd3 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2Request.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2Request.java @@ -2,32 +2,30 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; public class Http2Request { - private int streamId; - private ByteBuf commulation; + private final int streamId; + private final String path; private volatile Http2Headers headers; + private CompositeByteBuf pending; private Http2Stream http2Stream; private Http2Connection.PropertyKey streamKey; - private String marshaller; - private ByteBufAllocator allocator; - private byte[] content; + private int bytesToRead; + private final ByteBufAllocator alloc; - public Http2Request(int streamId, Http2Stream http2Stream - , Http2Headers headers - //, AbstractHttp2CodecHandler http2CodecHandler - , Http2Connection.PropertyKey streamKey, String marshaller, ByteBufAllocator allocator) { + public Http2Request(int streamId, String path, Http2Stream http2Stream, Http2Headers headers, + Http2Connection.PropertyKey streamKey, ByteBufAllocator allocator) { this.streamId = streamId; + this.path = path; this.http2Stream = http2Stream; this.headers = headers; - //this.http2CodecHandler = http2CodecHandler; + this.alloc=allocator; this.streamKey = streamKey; - this.marshaller = marshaller; - this.allocator = allocator; - this.commulation = allocator.buffer(); + this.pending = allocator.compositeBuffer(); } public Http2Headers getHeaders() { @@ -35,51 +33,44 @@ public Http2Headers getHeaders() { } public ByteBuf getData() { - return commulation; + return pending; } public int getStreamId() { return streamId; } - public byte[] content() { - if (content != null) { - return content; - } - this.content = new byte[commulation.readableBytes()]; - commulation.readBytes(content); - commulation.release(); - return content; + public void appendData(ByteBuf data) { + pending.addComponent(true, data); } - public void cumulate(ByteBuf byteBuf) { - commulation = cumulate(allocator, commulation, byteBuf); - } + public ByteBuf getAvailableTrunk(){ - public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { - final ByteBuf buffer; - if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() - || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain() or if its read-only. - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in.readableBytes()); - } else { - buffer = cumulation; - } - buffer.writeBytes(in); - return buffer; - } + int type = pending.readUnsignedByte(); +// if ((type & RESERVED_MASK) != 0) { +// throw Status.INTERNAL.withDescription( +// "gRPC frame header malformed: reserved bits not zero") +// .asRuntimeException(); +// } +// compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0; - private ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { - ByteBuf oldCumulation = cumulation; - cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - return cumulation; + // Update the required length to include the length of the frame. + this.bytesToRead = pending.readInt(); +/* if (requiredLength < 0 || requiredLength > maxInboundMessageSize) { + throw Status.RESOURCE_EXHAUSTED.withDescription( + String.format("gRPC message exceeds maximum size %d: %d", + maxInboundMessageSize, requiredLength)) + .asRuntimeException(); + }*/ + return tryRead(); + + } + private ByteBuf tryRead(){ + if(bytesToRead>0&&pending.readableBytes()>=bytesToRead){ + final ByteBuf ready = alloc.buffer(); + pending.readBytes(ready,bytesToRead); + return ready; + } + return null; } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Marshaller.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Marshaller.java index 11b960c8762..159073ae80d 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Marshaller.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Marshaller.java @@ -1,9 +1,5 @@ package org.apache.dubbo.rpc.protocol.tri; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; - import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; @@ -11,14 +7,15 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + public class Marshaller { public static Marshaller marshaller = new Marshaller(); public Object unmarshaller(Class requestClass, ByteBuf in) { - in.readByte(); - final int len = in.readInt(); - System.out.println(len); final Parser parser = ProtoUtil.getParser(requestClass); Object result = null; try { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java new file mode 100644 index 00000000000..456de2b348d --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java @@ -0,0 +1,11 @@ +package org.apache.dubbo.rpc.protocol.tri; + +public interface TripleConstant { + String STATUS_KEY = "grpc-status"; + String MESSAGE_KEY = "grpc-message"; + String CONTENT_TYPE_KEY = "content-type"; + String CONTENT_PROTO = "application/grpc+proto"; + + String APPLICATION_GRPC = "application/grpc"; + +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameListener.java index 001e53021e6..dbde4605a2c 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameListener.java @@ -1,8 +1,6 @@ package org.apache.dubbo.rpc.protocol.tri; import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.remoting.netty4.StreamData; -import org.apache.dubbo.remoting.netty4.StreamHeader; import org.apache.dubbo.rpc.AppResponse; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Result; @@ -11,19 +9,21 @@ import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.model.ServiceRepository; -import io.grpc.Status; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameAdapter; +import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; -import io.netty.util.AsciiString; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -34,10 +34,14 @@ public class TripleHttp2FrameListener extends Http2FrameAdapter { private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L; private static final InvokerResolver serviceContainer = ExtensionLoader.getExtensionLoader(InvokerResolver.class).getDefaultExtension(); private final Http2Connection connection; + private final Http2FrameWriter frameWriter; + private final Http2ConnectionEncoder encoder; protected Http2Connection.PropertyKey streamKey = null; - public TripleHttp2FrameListener(Http2Connection connection) { + public TripleHttp2FrameListener(Http2Connection connection, Http2FrameWriter frameWriter, Http2ConnectionEncoder encoder) { this.connection = connection; + this.frameWriter = frameWriter; + this.encoder = encoder; } @Override @@ -51,7 +55,6 @@ public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exce @Override public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { - System.out.println("onDataRead:" + streamId); Http2Stream stream = connection.stream(streamId); Http2Request request = stream == null ? null : (Http2Request) stream.getProperty(streamKey); @@ -60,11 +63,12 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int return data.readableBytes() + padding; } - request.cumulate(data); + request.appendData(data); int processed = data.readableBytes() + padding; - if (endOfStream) { - Invocation invocation = buildInvocation(request.getHeaders(), request.getData()); + final ByteBuf trunk = request.getAvailableTrunk(); + if(trunk!=null){ + Invocation invocation = buildInvocation(request.getHeaders(), trunk); // TODO add version/group support //TODO add method not found / service not found err Result result = serviceContainer.resolve("io.grpc.examples.helloworld.IGreeter").invoke(invocation); @@ -72,23 +76,21 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int future.whenComplete((appResult, t) -> { try { - if (t == null) { - AppResponse response = (AppResponse) appResult; - if (!response.hasException()) { - Http2Headers http2Headers = new DefaultHttp2Headers() - .status(OK.codeAsText()) - .set(HttpHeaderNames.CONTENT_TYPE, GrpcElf.GRPC_PROTO); - StreamHeader streamHeader = new StreamHeader(streamId, http2Headers, false); - ctx.channel().write(streamHeader); - - ByteBuf byteBuf = Marshaller.marshaller.marshaller(ctx.alloc(), response.getValue()); - StreamData streamData = new StreamData(false, streamId, byteBuf); - ctx.channel().write(streamData); - final Http2Headers trailers = new DefaultHttp2Headers() - .setInt(GrpcElf.GRPC_STATUS, Status.Code.OK.value()); - ctx.channel().write(new StreamHeader(streamId, trailers, true)); - } - } else { + if (t != null) { + // TODO add exception response + return; + } + AppResponse response = (AppResponse) appResult; + if (!response.hasException()) { + Http2Headers http2Headers = new DefaultHttp2Headers() + .status(OK.codeAsText()) + .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO); + encoder.writeHeaders(ctx, streamId, http2Headers, 0, false, ctx.newPromise()); + ByteBuf byteBuf = Marshaller.marshaller.marshaller(ctx.alloc(), response.getValue()); + encoder.writeData(ctx, streamId, byteBuf, 0, false, ctx.newPromise()); + final Http2Headers trailers = new DefaultHttp2Headers() + .setInt(TripleConstant.STATUS_KEY, GrpcStatus.OK.code); + encoder.writeHeaders(ctx, streamId, trailers, 0, true, ctx.newPromise()); } } catch (Exception e) { e.printStackTrace(); @@ -125,54 +127,58 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers onHeadersRead(ctx, streamId, headers, padding, endStream); } + private void responsePlainTextError(ChannelHandlerContext ctx, int streamId, int code, int statusCode, String msg) { + Http2Headers headers = new DefaultHttp2Headers(true) + .status("" + code) + .setInt(TripleConstant.STATUS_KEY, statusCode) + .set(TripleConstant.MESSAGE_KEY, msg) + .set(TripleConstant.CONTENT_TYPE_KEY, "text/plain; encoding=utf-8"); + encoder.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise()); + ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg); + encoder.writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise()); + } + @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream) throws Http2Exception { - System.out.println("onHeadersRead" + streamId); + if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) { + responsePlainTextError(ctx, streamId, HttpResponseStatus.METHOD_NOT_ALLOWED.code(), GrpcStatus.INTERNAL.code, + String.format("Method '%s' is not supported", headers.method())); + return; + } if (headers.path() == null) { - System.out.println("Expected path but is missing"); + responsePlainTextError(ctx, streamId, HttpResponseStatus.NOT_FOUND.code(), GrpcStatus.UNIMPLEMENTED.code, "Expected path but is missing"); return; } final String path = headers.path().toString(); if (path.charAt(0) != '/') { - System.out.println("Expected path but is missing1"); + responsePlainTextError(ctx, streamId, HttpResponseStatus.NOT_FOUND.code(), GrpcStatus.UNIMPLEMENTED.code, + String.format("Expected path to start with /: %s", path)); return; } final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE)); if (contentType == null) { - System.out.println("Expected path but is missing2"); + responsePlainTextError(ctx, streamId, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(), GrpcStatus.INTERNAL.code, + "Content-Type is missing from the request"); return; } - if (!GrpcElf.isGrpcContentType(contentType)) { - System.out.println("Expected path but is missing3"); + final String contentString = contentType.toString(); + if (!TripleUtil.supportContentType(contentString)) { + responsePlainTextError(ctx, streamId, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(), GrpcStatus.INTERNAL.code, + String.format("Content-Type '%s' is not supported", contentString)); return; } - if (!HttpMethod.POST.asciiName().equals(headers.method())) { - System.out.println("Expected path but is missing4"); - return; - } - - String marshaller; - if (AsciiString.contentEquals(contentType, GrpcElf.APPLICATION_GRPC) || AsciiString.contentEquals(contentType, GrpcElf.GRPC_PROTO)) { - marshaller = "protobuf"; - } else if (AsciiString.contentEquals(contentType, GrpcElf.GRPC_JSON)) { - marshaller = "protobuf-json"; - } else { - System.out.println("Expected path but is missing5"); - return; - } Http2Stream http2Stream = connection.stream(streamId); if (streamKey == null) { streamKey = connection.newKey(); } - Http2Request request = new Http2Request(streamId, http2Stream, headers, streamKey, marshaller, - ctx.alloc()); + Http2Request request = new Http2Request(streamId, path, http2Stream, headers, streamKey, ctx.alloc()); http2Stream.setProperty(streamKey, request); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index b9f0da79812..20650cdab4e 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -7,8 +7,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; @Activate public class TripleHttp2Protocol extends Http2WireProtocol { @@ -23,10 +28,12 @@ public void close() { public void configServerPipeline(ChannelHandlerContext ctx) { final ChannelPipeline p = ctx.pipeline(); final Http2Connection connection = new DefaultHttp2Connection(true); + final Http2FrameWriter frameWriter= new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),Http2WireProtocol.SERVER_LOGGER); + final Http2ConnectionEncoder encoder=new DefaultHttp2ConnectionEncoder(connection,frameWriter); final Http2ConnectionHandler handler = new DubboConnectionHandlerBuilder() .connection(connection) .frameLogger(SERVER_LOGGER) - .frameListener(new TripleHttp2FrameListener(connection)) + .frameListener(new TripleHttp2FrameListener(connection, frameWriter, encoder)) .build(); p.addLast(handler); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerResolver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerResolver.java index 2b113dbe110..cb4c86f659a 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerResolver.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerResolver.java @@ -7,6 +7,8 @@ import java.util.concurrent.ConcurrentHashMap; public class TripleInvokerResolver implements InvokerResolver { + // path0: com.foo.Bar + // path1: com.foo.Bar2:version private final ConcurrentHashMap> path2Invoker = new ConcurrentHashMap<>(); private final Set ignoreSet = ConcurrentHashMap.newKeySet(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java new file mode 100644 index 00000000000..0481701fe71 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java @@ -0,0 +1,15 @@ +package org.apache.dubbo.rpc.protocol.tri; + +public class TripleUtil { + + /** + * must starts from application/grpc + */ + public static boolean supportContentType(String contentType) { + if (contentType == null) { + return false; + } + + return contentType.startsWith(TripleConstant.APPLICATION_GRPC); + } +}