Skip to content

Commit

Permalink
Initial implementation of the Streaming API
Browse files Browse the repository at this point in the history
This pull request provides a framework for exchanging a very large
stream between handlers, typically between a decoder and an inbound
handler (or between a handler that writes a message and an encoder that
encodes that message).

For example, an HTTP decoder, previously, generates multiple
micro-messages to decode an HTTP message (i.e. HttpRequest +
HttpChunks). With the streaming API, The HTTP decoder can simply
generate a single HTTP message whose content is a Stream. And then the
inbound handler can consume the Stream via the buffer you created when
you begin to read the stream. If you create a buffer whose capacity is
bounded, you can handle a very large stream without allocating a lot of
memory. If you just want to wait until the whole content is ready, you
can also do that with an unbounded buffer.

The streaming API also supports a limited form of communication between
a producer (i.e. decoder) and a consumer. A producer can abort the
stream if the stream is not valid anymore. A consumer can choose to
reject or discard the stream, where rejection is for unrecoverable
failure and discard is for recoverable failure.

P.S. Special thanks to @jpinner for the initial input.
  • Loading branch information
trustin committed Mar 10, 2013
1 parent b4bf565 commit 32efba3
Show file tree
Hide file tree
Showing 10 changed files with 915 additions and 0 deletions.
497 changes: 497 additions & 0 deletions buffer/src/main/java/io/netty/buffer/stream/AbstractStream.java

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions buffer/src/main/java/io/netty/buffer/stream/Stream.java
@@ -0,0 +1,26 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;

public interface Stream<T extends Buf> {
void accept(EventExecutor executor, StreamConsumer<? super T> consumer);
void discard();
void reject(Throwable cause);
}
28 changes: 28 additions & 0 deletions buffer/src/main/java/io/netty/buffer/stream/StreamConsumer.java
@@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

import io.netty.buffer.Buf;

public interface StreamConsumer<T extends Buf> {
T newStreamBuffer(StreamConsumerContext<T> ctx) throws Exception;

void streamOpen(StreamConsumerContext<T> ctx) throws Exception;
void streamUpdated(StreamConsumerContext<T> ctx) throws Exception;
void streamAborted(StreamConsumerContext<T> ctx, Throwable cause) throws Exception;
void streamClosed(StreamConsumerContext<T> ctx) throws Exception;
}
@@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;

public interface StreamConsumerContext<T extends Buf> {
EventExecutor executor();
T buffer();
void next();
void discard();
void reject(Throwable cause);
}
@@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

public class StreamConsumerException extends StreamException {

private static final long serialVersionUID = -7688455132621735741L;

public StreamConsumerException() { }

public StreamConsumerException(String message) {
super(message);
}

public StreamConsumerException(String message, Throwable cause) {
super(message, cause);
}

public StreamConsumerException(Throwable cause) {
super(cause);
}
}
36 changes: 36 additions & 0 deletions buffer/src/main/java/io/netty/buffer/stream/StreamException.java
@@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

public class StreamException extends RuntimeException {

private static final long serialVersionUID = -8529582560484984135L;

public StreamException() { }

public StreamException(String message) {
super(message);
}

public StreamException(String message, Throwable cause) {
super(message, cause);
}

public StreamException(Throwable cause) {
super(cause);
}
}
26 changes: 26 additions & 0 deletions buffer/src/main/java/io/netty/buffer/stream/StreamProducer.java
@@ -0,0 +1,26 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

import io.netty.buffer.Buf;

public interface StreamProducer<T extends Buf> {
void streamAccepted(StreamProducerContext<T> ctx) throws Exception;
void streamConsumed(StreamProducerContext<T> ctx) throws Exception;
void streamDiscarded(StreamProducerContext<T> ctx) throws Exception;
void streamRejected(StreamProducerContext<T> ctx, Throwable cause) throws Exception;
}
@@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;

public interface StreamProducerContext<T extends Buf> {
EventExecutor executor();
T buffer();
StreamProducerContext<T> update();
void close();
void abort(Throwable cause);
}
@@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.buffer.stream;

public class StreamProducerException extends StreamException {

private static final long serialVersionUID = 8830744325775707415L;

public StreamProducerException() { }

public StreamProducerException(String message) {
super(message);
}

public StreamProducerException(String message, Throwable cause) {
super(message, cause);
}

public StreamProducerException(Throwable cause) {
super(cause);
}
}

0 comments on commit 32efba3

Please sign in to comment.