Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial implementation of the Streaming API

This pull request provides a framework for exchanging a very large
stream between handlers, typically between a decoder and an inbound
handler (or between a handler that writes a message and an encoder that
encodes that message).

For example, an HTTP decoder, previously, generates multiple
micro-messages to decode an HTTP message (i.e. HttpRequest +
HttpChunks). With the streaming API, The HTTP decoder can simply
generate a single HTTP message whose content is a Stream. And then the
inbound handler can consume the Stream via the buffer you created when
you begin to read the stream. If you create a buffer whose capacity is
bounded, you can handle a very large stream without allocating a lot of
memory. If you just want to wait until the whole content is ready, you
can also do that with an unbounded buffer.

The streaming API also supports a limited form of communication between
a producer (i.e. decoder) and a consumer. A producer can abort the
stream if the stream is not valid anymore. A consumer can choose to
reject or discard the stream, where rejection is for unrecoverable
failure and discard is for recoverable failure.

P.S. Special thanks to @jpinner for the initial input.
  • Loading branch information...
commit 3144a5a9b6f11e418e2441e59d78d37cfd9b114e 1 parent 476524d
@trustin trustin authored
View
497 buffer/src/main/java/io/netty/buffer/stream/AbstractStream.java
@@ -0,0 +1,497 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.Buf;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.util.Random;
+
+public abstract class AbstractStream<T extends Buf> implements Stream<T> {
+
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractStream.class);
+
+ private static final Random random = new Random();
+
+ private static final ThreadLocal<Boolean> IN_CONSUMER = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return Boolean.FALSE;
+ }
+ };
+
+ /**
+ * Generate a random string that can be used for correlating a group of log messages.
+ */
+ private static String nextLogKey() {
+ return Long.toHexString(random.nextInt() & 0xFFFFFFFFL);
+ }
+
+ private final StreamProducerContextImpl producerCtx;
+ private StreamConsumerContextImpl consumerCtx;
+ private Runnable invokeStreamConsumedTask;
+
+ /** 0 - init, 1 - accepted, 2 - discarded, 3 - rejected, 4 - closed */
+ int state;
+ T buffer;
+
+ @SuppressWarnings("unchecked")
+ protected AbstractStream(EventExecutor executor, StreamProducer<? super T> producer) {
+ producerCtx = new StreamProducerContextImpl(executor, producer);
+ }
+
+ T buffer() {
+ T buffer = this.buffer;
+ if (buffer == null) {
+ fail();
+ }
+ return buffer;
+ }
+
+ @Override
+ public void accept(EventExecutor executor, StreamConsumer<? super T> consumer) {
+ if (executor == null) {
+ throw new NullPointerException("executor");
+ }
+ if (consumer == null) {
+ throw new NullPointerException("handler");
+ }
+
+ if (state != 0) {
+ fail();
+ }
+
+ StreamConsumerContextImpl consumerCtx = new StreamConsumerContextImpl(executor, consumer);
+
+ @SuppressWarnings("unchecked")
+ StreamConsumer<T> h = (StreamConsumer<T>) consumer;
+ try {
+ buffer = h.newStreamBuffer(consumerCtx);
+ } catch (Throwable t) {
+ PlatformDependent.throwException(t);
+ }
+
+ this.consumerCtx = consumerCtx;
+ state = 1;
+
+ fireStreamAccepted();
+ }
+
+ private void fireStreamAccepted() {
+ EventExecutor e = producerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamAccepted();
+ } else {
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamAccepted();
+ }
+ });
+ }
+ }
+
+ private void invokeStreamAccepted() {
+ StreamProducerContextImpl producerCtx = this.producerCtx;
+ try {
+ producerCtx.producer.streamAccepted(producerCtx);
+ } catch (Throwable t) {
+ safeAbort(producerCtx, t);
+ return;
+ }
+
+ if (consumerCtx.nextCalled) {
+ consumerCtx.invokeStreamConsumed();
+ }
+ }
+
+ void safeAbort(StreamProducerContextImpl producerCtx, Throwable cause) {
+ try {
+ producerCtx.abort(cause);
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ String key = nextLogKey();
+ logger.warn("[{}] Failed to auto-abort a stream.", key, t);
+ logger.warn("[{}] .. when invoked with the following cause:", key, cause);
+ }
+ }
+ }
+
+ @Override
+ public void discard() {
+ StreamConsumerContextImpl consumerCtx = this.consumerCtx;
+ if (consumerCtx == null) {
+ fail();
+ }
+ consumerCtx.discard();
+ }
+
+ @Override
+ public void reject(Throwable cause) {
+ StreamConsumerContextImpl consumerCtx = this.consumerCtx;
+ if (consumerCtx == null) {
+ fail();
+ }
+ consumerCtx.reject(cause);
+ }
+
+ private void fail() {
+ switch (state) {
+ case 0:
+ throw new IllegalStateException("stream not accepted yet");
+ case 1:
+ throw new IllegalStateException("stream accepted already");
+ case 2:
+ throw new IllegalStateException("stream discarded already");
+ case 3:
+ throw new IllegalStateException("stream rejected already");
+ case 4:
+ throw new IllegalStateException("stream closed already");
+ default:
+ throw new Error();
+ }
+ }
+
+ private final class StreamProducerContextImpl implements StreamProducerContext<T> {
+
+ final EventExecutor executor;
+ final StreamProducer<T> producer;
+ boolean invokedStreamOpen;
+ Runnable invokeStreamUpdatedTask;
+
+ @SuppressWarnings("unchecked")
+ StreamProducerContextImpl(EventExecutor executor, StreamProducer<? super T> producer) {
+ if (executor == null) {
+ throw new NullPointerException("executor");
+ }
+ if (producer == null) {
+ throw new NullPointerException("producer");
+ }
+
+ this.executor = executor;
+ this.producer = (StreamProducer<T>) producer;
+ }
+
+ @Override
+ public EventExecutor executor() {
+ return executor;
+ }
+
+ @Override
+ public T buffer() {
+ return AbstractStream.this.buffer();
+ }
+
+ @Override
+ public StreamProducerContext<T> update() {
+ if (state != 1) {
+ fail();
+ }
+
+ fireStreamUpdated();
+ return this;
+ }
+
+ private void fireStreamUpdated() {
+ EventExecutor e = consumerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamUpdated();
+ } else {
+ Runnable task = invokeStreamUpdatedTask;
+ if (task == null) {
+ invokeStreamUpdatedTask = task = new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamUpdated();
+ }
+ };
+ }
+ e.execute(task);
+ }
+ }
+
+ private void invokeStreamUpdated() {
+ StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
+ StreamConsumer<T> consumer = consumerCtx.consumer;
+ if (consumerCtx.singleThreaded) {
+ IN_CONSUMER.set(Boolean.TRUE);
+ }
+ try {
+ if (!invokedStreamOpen) {
+ invokedStreamOpen = true;
+ try {
+ consumer.streamOpen(consumerCtx);
+ } catch (Throwable t) {
+ safeAbort(producerCtx, new StreamConsumerException(t));
+ return;
+ }
+ }
+
+ try {
+ consumer.streamUpdated(consumerCtx);
+ } catch (Throwable t) {
+ safeAbort(producerCtx, new StreamConsumerException(t));
+ }
+ } finally {
+ if (consumerCtx.singleThreaded) {
+ IN_CONSUMER.set(Boolean.FALSE);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (state == 1) {
+ state = 4;
+ fireStreamClosed();
+ }
+ }
+
+ private void fireStreamClosed() {
+ EventExecutor e = consumerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamClosed();
+ } else {
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamClosed();
+ }
+ });
+ }
+ }
+
+ private void invokeStreamClosed() {
+ StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
+ try {
+ consumerCtx.consumer.streamClosed(consumerCtx);
+ } catch (Throwable t) {
+ logger.warn("StreamConsumer.streamClosed() raised an exception.", t);
+ }
+ }
+
+ @Override
+ public void abort(Throwable cause) {
+ if (cause == null) {
+ throw new NullPointerException("cause");
+ }
+
+ if (state != 1) {
+ fail();
+ }
+
+ fireStreamAborted(cause);
+ }
+
+ private void fireStreamAborted(final Throwable cause) {
+ StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
+ EventExecutor e = consumerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamAborted(cause);
+ } else {
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamAborted(cause);
+ }
+ });
+ }
+ }
+
+ private void invokeStreamAborted(Throwable cause) {
+ StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
+ try {
+ consumerCtx.consumer.streamAborted(consumerCtx, cause);
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ String key = nextLogKey();
+ logger.warn("[{}] StreamConsumer.streamAborted() raised an exception.", key, t);
+ logger.warn("[{}] .. when invoked with the following cause:", key, cause);
+ }
+ } finally {
+ invokeStreamClosed();
+ }
+ }
+ }
+
+ private final class StreamConsumerContextImpl implements StreamConsumerContext<T> {
+
+ final EventExecutor executor;
+ final StreamConsumer<T> consumer;
+ final boolean singleThreaded;
+ boolean nextCalled;
+
+ @SuppressWarnings("unchecked")
+ StreamConsumerContextImpl(EventExecutor executor, StreamConsumer<? super T> consumer) {
+ if (executor == null) {
+ throw new NullPointerException("executor");
+ }
+ if (consumer == null) {
+ throw new NullPointerException("consumer");
+ }
+ this.executor = executor;
+ this.consumer = (StreamConsumer<T>) consumer;
+ singleThreaded = executor == producerCtx.executor;
+ }
+
+ @Override
+ public EventExecutor executor() {
+ return executor;
+ }
+
+ @Override
+ public T buffer() {
+ return AbstractStream.this.buffer();
+ }
+
+ @Override
+ public void discard() {
+ switch (state) {
+ case 2:
+ return;
+ case 3:
+ case 4:
+ fail();
+ }
+
+ T buffer = AbstractStream.this.buffer;
+ if (buffer != null) {
+ buffer.release();
+ }
+
+ state = 2;
+
+ fireStreamDiscarded();
+ }
+
+ private void fireStreamDiscarded() {
+ EventExecutor e = producerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamDiscarded();
+ } else {
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamDiscarded();
+ }
+ });
+ }
+ }
+
+ private void invokeStreamDiscarded() {
+ StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx;
+ try {
+ producerCtx.producer.streamDiscarded(producerCtx);
+ } catch (Throwable t) {
+ logger.warn("StreamProducer.streamDiscarded() raised an exception.", t);
+ }
+ }
+
+ @Override
+ public void reject(Throwable cause) {
+ if (cause == null) {
+ throw new NullPointerException("cause");
+ }
+
+ if (state != 1) {
+ fail();
+ }
+
+ T buffer = AbstractStream.this.buffer;
+ if (buffer != null) {
+ buffer.release();
+ }
+
+ state = 3;
+
+ fireStreamRejected(cause);
+ }
+
+ private void fireStreamRejected(final Throwable cause) {
+ EventExecutor e = producerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamRejected(cause);
+ } else {
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamRejected(cause);
+ }
+ });
+ }
+ }
+
+ private void invokeStreamRejected(Throwable cause) {
+ StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx;
+ try {
+ producerCtx.producer.streamRejected(producerCtx, cause);
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ String key = nextLogKey();
+ logger.warn("[{}] StreamProducer.streamRejected() raised an exception.", key, t);
+ logger.warn("[{}] .. when invoked with the following cause:", key, cause);
+ }
+ }
+ }
+
+ @Override
+ public void next() {
+ if (state != 1) {
+ fail();
+ }
+
+ if (singleThreaded && IN_CONSUMER.get()) {
+ nextCalled = true;
+ } else {
+ fireStreamConsumed();
+ }
+ }
+
+ private void fireStreamConsumed() {
+ EventExecutor e = producerCtx.executor;
+ if (e.inEventLoop()) {
+ invokeStreamConsumed();
+ } else {
+ Runnable task = invokeStreamConsumedTask;
+ if (task == null) {
+ invokeStreamConsumedTask = task = new Runnable() {
+ @Override
+ public void run() {
+ invokeStreamConsumed();
+ }
+ };
+ }
+ e.execute(task);
+ }
+ }
+
+ private void invokeStreamConsumed() {
+ StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx;
+ try {
+ do {
+ consumerCtx.nextCalled = false;
+ producerCtx.producer.streamConsumed(producerCtx);
+ } while (consumerCtx.nextCalled);
+ } catch (Throwable t) {
+ reject(new StreamProducerException(t));
+ }
+ }
+ }
+}
View
26 buffer/src/main/java/io/netty/buffer/stream/Stream.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.Buf;
+import io.netty.util.concurrent.EventExecutor;
+
+public interface Stream<T extends Buf> {
+ void accept(EventExecutor executor, StreamConsumer<? super T> consumer);
+ void discard();
+ void reject(Throwable cause);
+}
View
28 buffer/src/main/java/io/netty/buffer/stream/StreamConsumer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.Buf;
+
+public interface StreamConsumer<T extends Buf> {
+ T newStreamBuffer(StreamConsumerContext<T> ctx) throws Exception;
+
+ void streamOpen(StreamConsumerContext<T> ctx) throws Exception;
+ void streamUpdated(StreamConsumerContext<T> ctx) throws Exception;
+ void streamAborted(StreamConsumerContext<T> ctx, Throwable cause) throws Exception;
+ void streamClosed(StreamConsumerContext<T> ctx) throws Exception;
+}
View
28 buffer/src/main/java/io/netty/buffer/stream/StreamConsumerContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.Buf;
+import io.netty.util.concurrent.EventExecutor;
+
+public interface StreamConsumerContext<T extends Buf> {
+ EventExecutor executor();
+ T buffer();
+ void next();
+ void discard();
+ void reject(Throwable cause);
+}
View
36 buffer/src/main/java/io/netty/buffer/stream/StreamConsumerException.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+public class StreamConsumerException extends StreamException {
+
+ private static final long serialVersionUID = -7688455132621735741L;
+
+ public StreamConsumerException() { }
+
+ public StreamConsumerException(String message) {
+ super(message);
+ }
+
+ public StreamConsumerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StreamConsumerException(Throwable cause) {
+ super(cause);
+ }
+}
View
36 buffer/src/main/java/io/netty/buffer/stream/StreamException.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+public class StreamException extends RuntimeException {
+
+ private static final long serialVersionUID = -8529582560484984135L;
+
+ public StreamException() { }
+
+ public StreamException(String message) {
+ super(message);
+ }
+
+ public StreamException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StreamException(Throwable cause) {
+ super(cause);
+ }
+}
View
26 buffer/src/main/java/io/netty/buffer/stream/StreamProducer.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.Buf;
+
+public interface StreamProducer<T extends Buf> {
+ void streamAccepted(StreamProducerContext<T> ctx) throws Exception;
+ void streamConsumed(StreamProducerContext<T> ctx) throws Exception;
+ void streamDiscarded(StreamProducerContext<T> ctx) throws Exception;
+ void streamRejected(StreamProducerContext<T> ctx, Throwable cause) throws Exception;
+}
View
28 buffer/src/main/java/io/netty/buffer/stream/StreamProducerContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.Buf;
+import io.netty.util.concurrent.EventExecutor;
+
+public interface StreamProducerContext<T extends Buf> {
+ EventExecutor executor();
+ T buffer();
+ StreamProducerContext<T> update();
+ void close();
+ void abort(Throwable cause);
+}
View
36 buffer/src/main/java/io/netty/buffer/stream/StreamProducerException.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+public class StreamProducerException extends StreamException {
+
+ private static final long serialVersionUID = 8830744325775707415L;
+
+ public StreamProducerException() { }
+
+ public StreamProducerException(String message) {
+ super(message);
+ }
+
+ public StreamProducerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StreamProducerException(Throwable cause) {
+ super(cause);
+ }
+}
View
20 buffer/src/main/java/io/netty/buffer/stream/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Enables exchanging a very large stream between a producer and a consumer.
+ */
+package io.netty.buffer.stream;
View
174 buffer/src/test/java/io/netty/buffer/stream/StreamTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Promise;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class StreamTest {
+
+ private static EventExecutor executorA;
+ private static EventExecutor executorB;
+
+ @BeforeClass
+ public static void setUp() {
+ executorA = new DefaultEventExecutorGroup(1).next();
+ executorB = new DefaultEventExecutorGroup(1).next();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ executorB.shutdown();
+ executorA.shutdown();
+ }
+
+ @Test(timeout = 5000)
+ public void testSimpleWithSameExecutor() throws Exception {
+ final long size = 1048576L * 1024L * 1024L; // Transfer whooping 1 TiB of garbage
+ testSimple0(executorA, executorA, size);
+ }
+
+ @Test(timeout = 10000)
+ public void testSimpleWithDifferentExecutors() throws Exception {
+ final long size = 1048576L * 1024L * 16L; // Transfer 16 GiB of garbage
+ testSimple0(executorA, executorB, size);
+ }
+
+ private static void testSimple0(EventExecutor executorA, EventExecutor executorB, long size) throws Exception {
+ LargeByteStreamConsumer consumer = new LargeByteStreamConsumer(size);
+ Stream<ByteBuf> stream = new LargeByteStream(executorA, size);
+ stream.accept(executorB, consumer);
+
+ for (;;) {
+ if (consumer.future.await(1000)) {
+ break;
+ }
+
+ System.err.println(consumer.counter + " / " + size);
+ }
+
+ consumer.future.sync();
+ }
+
+ private static class LargeByteStream extends AbstractStream<ByteBuf> {
+ LargeByteStream(EventExecutor executor, long size) {
+ super(executor, new LargeByteStreamProducer(size));
+ }
+ }
+
+ private static class LargeByteStreamProducer implements StreamProducer<ByteBuf> {
+
+ private final long size;
+ private long counter;
+
+ LargeByteStreamProducer(long size) {
+ this.size = size;
+ }
+
+ @Override
+ public void streamAccepted(StreamProducerContext<ByteBuf> ctx) throws Exception {
+ generate(ctx);
+ }
+
+ @Override
+ public void streamConsumed(StreamProducerContext<ByteBuf> ctx) throws Exception {
+ generate(ctx);
+ }
+
+ private void generate(StreamProducerContext<ByteBuf> ctx) {
+ ByteBuf buf = ctx.buffer();
+ int chunkSize = (int) Math.min(size - counter, buf.maxWritableBytes());
+ buf.ensureWritable(chunkSize);
+ buf.writerIndex(buf.writerIndex() + chunkSize);
+ ctx.update();
+ counter += chunkSize;
+ if (counter == size) {
+ ctx.close();
+ return;
+ }
+
+ if (counter > size) {
+ throw new AssertionError("counter > size");
+ }
+ }
+
+ @Override
+ public void streamDiscarded(StreamProducerContext<ByteBuf> ctx) throws Exception {
+ throw new AssertionError("stream discarded");
+ }
+
+ @Override
+ public void streamRejected(StreamProducerContext<ByteBuf> ctx, Throwable cause) throws Exception {
+ throw new AssertionError("stream rejected", cause);
+ }
+ }
+
+ private static class LargeByteStreamConsumer implements StreamConsumer<ByteBuf> {
+
+ private final long size;
+ volatile Promise future;
+ volatile long counter;
+
+ LargeByteStreamConsumer(long size) {
+ this.size = size;
+ }
+
+ @Override
+ public ByteBuf newStreamBuffer(StreamConsumerContext<ByteBuf> ctx) throws Exception {
+ future = new DefaultPromise(ctx.executor());
+ return Unpooled.buffer(0, 65536); // Only use 64 KiB at max.
+ }
+
+ @Override
+ public void streamOpen(StreamConsumerContext<ByteBuf> ctx) throws Exception { }
+
+ @Override
+ public void streamUpdated(StreamConsumerContext<ByteBuf> ctx) throws Exception {
+ ByteBuf buf = ctx.buffer();
+ int chunkSize = buf.readableBytes();
+ buf.skipBytes(chunkSize);
+ buf.discardReadBytes();
+ counter += chunkSize;
+ if (counter == size) {
+ future.setSuccess();
+ } else if (counter > size) {
+ AssertionError error = new AssertionError("counter > size");
+ ctx.reject(error);
+ future.setFailure(error);
+ } else {
+ ctx.next();
+ }
+ }
+
+ @Override
+ public void streamAborted(StreamConsumerContext<ByteBuf> ctx, Throwable cause) throws Exception {
+ future.setFailure(cause);
+ }
+
+ @Override
+ public void streamClosed(StreamConsumerContext<ByteBuf> ctx) throws Exception {
+ future.tryFailure(new AssertionError("tryFailure() must return false."));
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.