Skip to content

Commit

Permalink
Add HttpDeframer for decoding HttpObjects on top of Reactive Streams (
Browse files Browse the repository at this point in the history
#2981)

Motivation:

By adding a skeletal `Processor<HttpData, T>` implementation, we can easily get certain data
that is scattered across the data frames without individual queueing and assembling.
As a result, we can make `ArmeriaMessageDeframer` focus on decoding framed data to a gRPC message and remove resource management logic.

Modifications:

- Add `HttpDeframer` that implements Reactive Streams' `Processor` that consumes `HttpObject`s and produces deframed objects
- Add `HttpDeframerHandler` that can define how to decode `HttpObject`s
- Add `HttpDeframerInput` used to read a stream of `HttpData`s
- Add `HttpDeframerOutput` that holds decoded data.
- Rename `ArmeriaMessageDeframer` to `ArmeriaMessageDeframerHandler`
  - Refactor `ArmeriaMessageDeframerHandler` to implement `HttpDeframerHandler`
- Refactor `ArmeriaClientCall` and `ArmeriaServerCall` to implement `Subscriber` instead of the custom `ArmeriaMessageDeframer.Listener`
- Make `HttpStreamDeframerHandler` extend `ArmeriaMessageDeframerHandler` for handling gRPC headers and trailers.
- Add `HttpDeframerTckTest` for verifying Reactive Streams interops.

Result:

- You can now use `HttpDeframer` to conveniently decode a stream of `HttpObject` to N objects
- Revamp internal gRPC implementations to focus on encode and decode gRPC wire format.
- Fixes #2925
  • Loading branch information
ikhoon committed Nov 19, 2020
1 parent 39e97e2 commit bd09b15
Show file tree
Hide file tree
Showing 39 changed files with 2,795 additions and 1,739 deletions.
@@ -0,0 +1,191 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.stream;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;

final class ByteBufDeframerInput implements HttpDeframerInput {

private final ByteBufAllocator alloc;
private final Queue<ByteBuf> queue;

ByteBufDeframerInput(ByteBufAllocator alloc) {
this.alloc = alloc;
queue = new ArrayDeque<>();
}

void add(ByteBuf byteBuf) {
if (byteBuf.isReadable()) {
queue.add(byteBuf);
} else {
byteBuf.release();
}
}

@Override
public int readableBytes() {
if (queue.isEmpty()) {
return 0;
}

int readableBytes = 0;
for (ByteBuf buf : queue) {
readableBytes += buf.readableBytes();
}
return readableBytes;
}

@Override
public byte readByte() {
final ByteBuf buf = queue.peek();

if (buf == null) {
throw newEndOfInputException();
}

final int readableBytes = buf.readableBytes();
final byte value = buf.readByte();
if (readableBytes == 1) {
queue.remove();
buf.release();
}
return value;
}

@Override
public int readInt() {
final ByteBuf firstBuf = queue.peek();
if (firstBuf == null) {
throw newEndOfInputException();
}

final int readableBytes = firstBuf.readableBytes();
if (readableBytes >= 4) {
final int value = firstBuf.readInt();
if (readableBytes == 4) {
queue.remove();
firstBuf.release();
}
return value;
}

return readIntSlow();
}

private int readIntSlow() {
int value = 0;
int remaining = 4;
for (final Iterator<ByteBuf> it = queue.iterator(); it.hasNext();) {
final ByteBuf buf = it.next();
final int readableBytes = buf.readableBytes();
final int readSize = Math.min(remaining, readableBytes);

value <<= 8 * readSize;
switch (readSize) {
case 1:
value |= buf.readUnsignedByte();
break;
case 2:
value |= buf.readUnsignedShort();
break;
case 3:
value |= buf.readUnsignedMedium();
break;
default:
throw new Error(); // Should not reach here.
}
if (readSize == readableBytes) {
it.remove();
buf.release();
}
remaining -= readSize;
if (remaining == 0) {
return value;
}
}

throw newEndOfInputException();
}

@Override
public ByteBuf readBytes(int length) {
checkArgument(length > 0, "length %s (expected: length > 0)", length);
final ByteBuf firstBuf = queue.peek();
if (firstBuf == null) {
throw newEndOfInputException();
}

final int readableBytes = firstBuf.readableBytes();
if (readableBytes == length) {
return queue.remove();
}
if (readableBytes > length) {
return firstBuf.readRetainedSlice(length);
}

return readBytesSlow(length);
}

private ByteBuf readBytesSlow(int length) {
final ByteBuf value = alloc.buffer(length);
int remaining = length;
for (final Iterator<ByteBuf> it = queue.iterator(); it.hasNext();) {
final ByteBuf buf = it.next();
final int readableBytes = buf.readableBytes();
assert readableBytes > 0 : buf;

final int readSize = Math.min(remaining, readableBytes);
value.writeBytes(buf, readSize);
if (readableBytes == readSize) {
it.remove();
buf.release();
}

remaining -= readSize;
if (remaining == 0) {
return value;
}
}

ReferenceCountUtil.release(value);
throw newEndOfInputException();
}

private static IllegalStateException newEndOfInputException() {
return new IllegalStateException("end of deframer input");
}

@Override
public void close() {
for (;;) {
final ByteBuf buf = queue.poll();
if (buf != null) {
buf.release();
} else {
break;
}
}
}
}
Expand Up @@ -114,7 +114,7 @@ public final boolean isEmpty() {
}

@Override
final SubscriptionImpl subscribe(SubscriptionImpl subscription) {
SubscriptionImpl subscribe(SubscriptionImpl subscription) {
if (!subscriptionUpdater.compareAndSet(this, null, subscription)) {
final SubscriptionImpl oldSubscription = this.subscription;
assert oldSubscription != null;
Expand Down Expand Up @@ -215,7 +215,7 @@ final long demand() {
}

@Override
final void request(long n) {
void request(long n) {
final SubscriptionImpl subscription = this.subscription;
// A user cannot access subscription without subscribing.
assert subscription != null;
Expand All @@ -241,7 +241,7 @@ private void doRequest(long n) {
}

@Override
final void cancel() {
void cancel() {
if (setState(State.OPEN, State.CLEANUP) || setState(State.CLOSED, State.CLEANUP)) {
// It the state was CLOSED, close() or close(cause) has been called before cancel() or abort()
// is called. We just ignore the previously pushed event and deal with CANCELLED_CLOSE.
Expand Down

0 comments on commit bd09b15

Please sign in to comment.