Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ByteBuffers/Strings with onNext(T item) and request(int n)? #270

Closed
gregw opened this issue May 28, 2015 · 39 comments
Closed

ByteBuffers/Strings with onNext(T item) and request(int n)? #270

gregw opened this issue May 28, 2015 · 39 comments

Comments

@gregw
Copy link

gregw commented May 28, 2015

What type and aggregations should be used to use reactive streams for a byte or character IO?

So for example, if I wanted a Subscriber, does request(n) refer to the number of byte buffers (of unknown size) or to the number of bytes in one or more byte buffers? What if my subscriber wanted to control both the number of bytebuffers and their total size?

I really don't want to do Subscriber unless there was a way to have an onNext(byte[] item) to receive bytes in bulk.

Ditto for String and characters, or arrays of anything

@viktorklang
Copy link
Contributor

Hi @gregw,

demand-element relation is explained here: https://github.com/reactive-streams/reactive-streams-jvm#subscriber-controlled-queue-bounds

Sadly, Java (or the JDK rather) does not have a standard representation for an immutable chuck of bytes (just as String is a representation for an immutable chunck of unicode code points). An acceptable solution for your use case may be to send asReadOnly() versions of ByteBuffer?

@gregw
Copy link
Author

gregw commented May 29, 2015

Hi @viktorklang ,

Thanks for the response... firstly is this the right place to discuss this kind of thing. Please point me to the appropriate forum if not.

The link you gave makes it clear that the value passed in request(int n) will translate to the invocation count of onNext(), which is exactly the problem I don't know how to get around?

If I have a Subscriber and do a request(1), then I do know that onNext() will be called once and only once, but I don't know if it will be called with a ByteBuffer that contains a single byte or one that contains 4GB! Considering that onNext has to fully consume the content passed, that is a big problem.

What we are trying to do is to provide the Flow semantics directly from the Jetty servers asynchronous IO streams, so that developers don't have to deal with the complexities of Servlet API's async callbacks.

@abersnaze
Copy link

I put together the https://github.com/ReactiveX/RxJavaString/ project to help with some of the issues with splitting and encoding across different chunks of UTF8 bytes. Not sure any of it would be directly useful for ByteBuffers. If you want maybe that project would be a good place for a ByteBuffer specific operations if not then maybe it could be used as a template for your own custom operators.

@gregw
Copy link
Author

gregw commented May 29, 2015

George,

I can see that such concerns as splitting/encoding of byte streams that
contain UTF8 characters will indeed be important for solutions in this
space. I'm new to these APIs but my initial thought is that I would
expect to see something like:

class ITF8Processor implements Flow.Processor<ByteBuffer,CharSequence>

which could be used to convert bytes collection to characters sequences and
such a class would indeed need to handle the problem of multi-byte
characters splitting over byte buffers.

However, looking at your code I can't see that you've addressed the issue
of what unit to express back pressure control in. I can see your
onNext(byte[]) method doing the conversion, but I can't see it calling
Subscriber.request to communicate that it is able to receive more bytes?

It is that question I need resolved so I can get over the initial hump and
start using these APIs and perhaps trying out things like integration to
RxJava.

cheers

On 29 May 2015 at 12:44, George Campbell notifications@github.com wrote:

I put together the https://github.com/ReactiveX/RxJavaString/ project to
help with some of the issues with splitting and encoding across different
chunks of UTF8 bytes. Not sure any of it would be directly useful for
ByteBuffers. If you want maybe that project would be a good place for a
ByteBuffer specific operations if not then maybe it could be used as a
template for your own custom operators.


Reply to this email directly or view it on GitHub
#270 (comment)
.

Greg Wilkins <gregw@webtide.com gregw@intalio.com> - an Intalio.com
subsidiary

http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com advice and support for jetty and cometd.

@abersnaze
Copy link

You'll either have to have the Producer generate reasonable sized ByteBuffers or have an intermediate step that slices them into smaller parts.

Might I also suggest you take a look at https://github.com/ReactiveX/RxNetty.

@gregw
Copy link
Author

gregw commented May 29, 2015

Ah I've seen that this has been briefly discussed before in #47 and closed because it was too hard to solve simply!

Sorry but that seams like a bit of a cop out to me and will make the most simple of processors impossible to write asynchronously.

For example, if I want to write a processor the does UTF9 conversion of ByteBuffer to CharSequence, then it is possible for onNext to be called with a massive memory mapped file as a ByteBuffer, which will never ever fit into memory. So the processor will have no choice but to block and slice off a buffer of characters at a time, feeding them to its downstream subscriber until all have been consumed.

It is not even possible to write a non-blocking fragmenting Processor whose only role is to chop a big buffer into smaller ones that might avoid this problem downstream.

I don't think this is not a too hard problem and I can see two basic approaches:

  1. do a bit of work around defining ownership of an Item beyond the return of onNext(). ie if a processor was able to hold onto a reference to ByteBuffer item and return from onNext(), then at least asynchronous processing could be achieved.

  2. Add a request(long items, long size) method to the Subscription with the semantics that a call to request(long n) is equivalent to request(items,Long.MAX_VALUE). There would also need to be a generic way of determining the size of an arbitrary T item. If a producer had an Item that was too large for the requested size, it could either: wait for more size to be requested; slice the item if it knows how; or simply call onError(new InsufficientSizeException).

It would be a real pity if this semantic is put into the too hard basket. The whole point of APIs like this is that async is hard and needs to be simplified. Having a single async Flow abstraction that can handle bytes, chars, frames, objects, more complex objects etc. while keeping the same semantic and backpressure mechanism would be awesome!

@gregw
Copy link
Author

gregw commented May 29, 2015

@abersnaze

Surely at one end of a Producer chain their might be a user who is not going to be constrained by reasonable sizes. They are just going to memory map the biggest file they can find as a ByteBuffer and feed it into a Flow and want it to work.

You can write a Processor that slices large items into smaller items, but I can't see how it can be written asynchronously because the large item will arrive in an onNext call which can't return until the entire item has been consumed.

OK so maybe this can be done in a Producer, but to do so would require another asynchronous API and isn't the whole point of this exercise to give a nice portable async reactive API that users can know love and use? If we have to switch to another API when we hit uncontrolled user code, then it kind of defeats the point! No?

Also, even if the initial Producer creates right sized chunks, who is to say that they are right-sized all the way through a chain of Producers and Subscribers. If you are to use the Processor model then it needs to be able to asynchronously slice big items into small items. The current API does not allow this in a non blocking way, so the model will fail for an arbitrary chain of Processors with differing size limitations within the chain.

@akarnokd
Copy link
Contributor

Here are two examples of splitting potentially oversized byte[]/ByteBuffers into smaller ones:

public class ByteArraySplitting {
    public static void main(String[] args) {
        int maxSize = 300;

        Observable.just(new byte[1024]).repeat(2)
        .flatMap(a -> {
            int capacity = (a.length + maxSize - 1) / maxSize;
            List<byte[]> chunks = new ArrayList<>(capacity);
            int i = 0;
            int remaining = a.length;

            while (remaining != 0) {
                int len = Math.min(maxSize, remaining);
                byte[] chunk = Arrays.copyOfRange(a, i, i + len);

                chunks.add(chunk);
                i += len;
                remaining -= len;
            }
            return Observable.from(chunks);
        })
        .subscribe(b -> System.out.println(b.length));


        ByteBuffer bb = ByteBuffer.allocate(1024);
        bb.put(new byte[1024]);
        bb.flip();

        Observable.just(bb)
        .flatMap(a -> {
            int capacity = (a.limit() + maxSize - 1) / maxSize;
            List<ByteBuffer> chunks = new ArrayList<>(capacity);
            int i = 0;
            int remaining = a.limit() - a.position();

            while (remaining != 0) {
                int len = Math.min(maxSize, remaining);
                ByteBuffer chunk = a.slice();

                chunk.position(i);
                chunk.limit(i + len);

                chunks.add(chunk);
                i += len;
                remaining -= len;
            }
            return Observable.from(chunks);
        })
        .subscribe(b -> System.out.println(b.position() + " .. " + b.limit()));
    }
}

Cutting down on the size is straighforward, but if you want to combine smaller chunks up to the limit and split larger chunks down to the limit, that is more complicated.

@gregw
Copy link
Author

gregw commented May 29, 2015

@akarnokd

I can't relate the examples you've give to an asynchronous implementation of a Flow.Processor<ByteBuffer,ByteBuffer>, where the big buffers will arrive in an onNext(ByteBuffer item) call and have to be fed to another Subscriber, who may only have done a request(1) and another request call may be some time in coming. Thus once the big buffer has been split, the processor can only calls its subscribers onNext(ByteBuffer) with a small buffer and then has to wait until another call to request(n) is made - hence it becomes blocking?

@abersnaze
Copy link

In RxJava all of the non blocking backpressure logic is in the last line return Observable.from(chunks); The chunks are computed eagerly but they are parcelled out as request(n) calls come in.

Here is the implementation of the from

    private static final class IterableProducer<T> implements Producer {
        private final Subscriber<? super T> o;
        private final Iterator<? extends T> it;

        private volatile long requested = 0;
        @SuppressWarnings("rawtypes")
        private static final AtomicLongFieldUpdater<IterableProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(IterableProducer.class, "requested");

        private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
            this.o = o;
            this.it = it;
        }

        @Override
        public void request(long n) {
            if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
                // already started with fast-path
                return;
            }
            if (n == Long.MAX_VALUE) {
                REQUESTED_UPDATER.set(this, n);
                // fast-path without backpressure
                while (it.hasNext()) {
                    if (o.isUnsubscribed()) {
                        return;
                    }
                    o.onNext(it.next());
                }
                if (!o.isUnsubscribed()) {
                    o.onCompleted();
                }
            } else if(n > 0) {
                // backpressure is requested
                long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
                if (_c == 0) {
                    while (true) {
                        /*
                         * This complicated logic is done to avoid touching the volatile `requested` value
                         * during the loop itself. If it is touched during the loop the performance is impacted significantly.
                         */
                        long r = requested;
                        long numToEmit = r;
                        while (it.hasNext() && --numToEmit >= 0) {
                            if (o.isUnsubscribed()) {
                                return;
                            }
                            o.onNext(it.next());

                        }

                        if (!it.hasNext()) {
                            if (!o.isUnsubscribed()) {
                                o.onCompleted();
                            }
                            return;
                        }
                        if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
                            // we're done emitting the number requested so return
                            return;
                        }

                    }
                }
            }

        }
    }

@gregw
Copy link
Author

gregw commented May 29, 2015

@abersnaze OK I'm going to need to play with your code a bit more to see if I can understand how you are avoiding blocking. Is this code against:

<dependency>
    <groupId>com.netflix.rxjava</groupId>
    <artifactId>rxjava-core</artifactId>
    <version>0.20.7</version>
</dependency>

@akarnokd
Copy link
Contributor

@gregw You are better off with the latest:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.0.11</version>
</dependency>

@abersnaze's example seems to be an older one because it has a concurrency bug in its fast-path.

@gregw
Copy link
Author

gregw commented May 29, 2015

Thanks @akarnokd and @abersnaze, I've got the code running and understand it better (I'm still getting my head around that code style).

I can see that example can be kind of thought of as asynchronous, as it holds the chunks until a request(n) call comes in from a subscriber, in which case it hands out n chunks. If the subscriber was remote or truly asynchronous (which it is not in this example), then there would be no thread waiting to hand out the next chunk.

However, this is essentially just a producer that slices a big buffer into lots of smaller buffers and hands them out as requested. This is not the Flow.Processor case that I'm concerned with where the big buffer has itself been delivered by an asynchronous callback.

Firstly pre computing the chunks may not always be possible. OK for ByteBuffers that can be sliced, but if it was being converted to a CharSequence, then precomputing would triple the memory commitment, which is kind of against the whole purpose of slicing it up in the first place.

Even if you solve this by taking a single slice as demanded, you still need to hold onto the source ByteBuffer while all the slices are taken. If that source ByteBuffer has been provided by a onNext call from a truly async source, then I don't see how this can work. It works in this example because the ByteBuffer is not recycled/reused for the duration of the test. But in a real usage, the ByteBuffer passed in an onNext call is likely to be reused immediately after the onNext returns - thus any slices taken subsequently will point to the wrong data.

Anyway, let me get my head a bit more into this style of code and I'll write a unit test that demonstrates the async semantic I'm trying to achieve... this could take me a few days.

@abersnaze
Copy link

In Rx we like to use Observable.from because it hides some complicated logic but it requires data to be precomputed. We've been experimenting with an AnstractOnSubscribe that hands the back pressure and delegates to a closure to produce just enough data when needed.

@gregw
Copy link
Author

gregw commented May 29, 2015

@abersnaze It is not so much the use of Observable.from that is the issue for me. If the task is slicing a ByteBuffer into small slices, then it is fit for purpose. The problem is not preslice or slice as you go, the problem is the scope of the big buffer that you are slicing from. If that big buffer is passed in from an onNext call as part of a chain of processors, then that onNext call cannot return while slices are current (as it backs them) or there is more slicing to do. Thus that onNext call becomes blocking not asynchronous.

@akarnokd
Copy link
Contributor

You can apply observeOn before the flatMap or subscribeOn after from so the stream gets asynchrony and your source is not held back that much.

@viktorklang
Copy link
Contributor

Sorry for the late reply, I didn't have the cycles to read through the whole thread, so forgive me if this has already been said:

There is absolutely no problem going from Collection as element type to T, nor the other way around,
demand is signalled on a per-element basis so that if you want to limit the size of the Collection then you should expose that when creating the Publisher:

someSource.strictChunkedPublisher(size = N).subscribe(subscriber)

the reason for this is simple—since the Collection has already been created, it is not occupying variable subscriber buffer space.

Perhaps you want to allow for "read as much as you have but don't send me more than N per chunk":

someSource.lenientChunkedPublisher(size = N).subscribe(subscriber)

The reasons for not messing around with specifying byte sizes are, but not limited to:

A) Given a type T what is its size?
B) Given type T and a max size of N, what do you do with Ts larger than N?
C) Do you allow for sending less than N?
D) What happens with the last data if it is less than N?
E) What does "demand N of size X" mean in practice?

Given that we can create types that reflect intent. (You could create a ByteBuf256 if you want to be explicit about capacity and optimization opportunities).

Given that we can transform chunks into their contents in a non-blocking and backpressured way (by peeling off elements and emitting their contents as requested)

Given that we can take elements and chunk them. (classing windowing)

===>

What would, if any, be the benefits of changing to include for chunking.
(Benchmarks are encouraged)

(Worth noting is that currently the RS spec allows for implementations that push ~175 million elements a second (point-to-point).)

@rkuhn
Copy link
Member

rkuhn commented Jun 2, 2015

There was a previous discussion that might also be of interest: #47

@jbrisbin
Copy link

jbrisbin commented Jun 2, 2015

FWIW we're dealing with this issue in the reactive-ipc project. [1]

The problem really becomes one of translation. By that I mean: Subscriber has capacity for X bytes but can't request bytes directly from the Publisher, only buffers. That means the Subscriber has to communicate to the Publisher (dynamically and not at CTOR/creation time) its current capacity in a unit that the Publisher understands but does not overrun the capacity of the Subscriber.

In short, I think there has to be additional information similar to what TimeUnit does for the long associated with time. Either a DemandUnit type abstraction or some other way to set an additional limit other than just the long associated with request(long).

[1] - reactive-ipc/reactive-ipc-jvm#28

@gregw
Copy link
Author

gregw commented Jun 2, 2015

@viktorklang

I understand that this is not an issue in a Publisher and it can fragment a large collection it already has in memory easy enough. The use-case I'm concerned with is if the memory limitation is in a Processor. Two examples come to mind:

  1. an async output stream is internally implemented with ReactiveStreams that use Processers to send the stream over a HTTP2 connection. So you will have Processors that compress the data, fragment it into HTTP2 frames and yet another Processor that TLS encrypts those frames. At each of these steps there will be either a maximum or optimum fragment size to deal with for compression, protocol frame size or encryption processing and the origin Publisher does not know what they are and has no means of discovering them. If the protocol frame size is smaller than the size of data published, then the Processors will have to block until they get no back pressure from their subscribers.

To illustrate this problem I've written an example (against the reactive streams API rather than rxJava), that includes a long README about the problems I can't get my head around:

https://github.com/jetty-project/jetty-reactive/tree/FragmentingProcessor

Again it is the Processors that I'm most concerned with. The origin publisher and ultimate subscriber have a lot more flexibility to deal with these kinds of issues. If an ecosystem of utility processors is to be developed (eg compressors, encryptors, encoders etc.) then they need to be able to operate asynchronously within the API provided. That is what I'm not currently seeing.

@gregw
Copy link
Author

gregw commented Jun 2, 2015

@rkuhn yes #47 is essentially the same issue.... problem is that it was closed as being too hard to fix!

I'm evaluating this API against a real asynchronous use-case that is in need of a better API (Servlet async IO) and I think it reasonable that an API proposed for java 9 can handle such use-cases at least as good as callbacks. Hence I'd like to open the discussion again to see if there are techniques within the API as proposed that can help or if there are some possible changes to the API. As without one or the other, reactive streams do not look suitable for these use-cases.

See the README in https://github.com/jetty-project/jetty-reactive/tree/FragmentingProcessor for more detailed description of the use-cases I'm concerned with.

cheers

@gregw
Copy link
Author

gregw commented Jun 2, 2015

@viktorklang ooops I didn't give my second example.... never mind, it is the same as the first one.

@viktorklang
Copy link
Contributor

@jbrisbin

The problem really becomes one of translation. By that I mean: Subscriber has capacity for X bytes but can't request bytes directly from the Publisher, only buffers. That means the Subscriber has to communicate to the Publisher (dynamically and not at CTOR/creation time) its current capacity in a unit that the Publisher understands but does not overrun the capacity of the Subscriber.

The memory has already been allocated for the chunk, so I don't see how the Subscriber's buffer would be overrun? (The Subscriber's buffer is in Ts, no matter if T is composite or not)

@jbrisbin
Copy link

jbrisbin commented Jun 2, 2015

@viktorklang Surely you don't think the only significant measure of a subscriber's capacity is an often unmeasurable (at the least unknowable) impact on RAM usage? There are other kinds of constraints that have nothing to do with RAM usage.

@viktorklang
Copy link
Contributor

@jbrisbin

@viktorklang Surely you don't think the only significant measure of a subscriber's capacity is an often unmeasurable (at the least unknowable) impact on RAM usage? There are other kinds of constraints that have nothing to do with RAM usage.

Hmm, how do you mean in this case?

To illustrate my point, let's say we have a ByteBuf256 type which, unsurprisingly, holds up-to 256 bytes.
Now, as a Subscriber<ByteBuf256> the question is what your demand pattern ought to look like: if you only want to buffer at most 256 bytes you issue a demand of 1 and wait for the next piece of data before issuing more. I.e. your input buffer is of size 1 and your element type can carry at most 256 bytes.
Am I missing something?

@gregw
Copy link
Author

gregw commented Jun 2, 2015

Actually.... I think I'm about to answer my own question.....

A Processor that receives a ByteBuffer in onNext that is larger than it can process without blocking (do to lack of backpressure) can just keep a reference to it and return from the onNext call... provided that:

  1. It didn't have more than request(1) outstanding, so the Publisher will not call onNext again
  2. The Publisher understands the contract that just because onNext() returns does not mean that the passed buffer has been fully handled and can be reused.
  3. The Publisher will wait (threadlessly) for a subsequent request(1) to inform it that processing of it's last onNext has now completed and the passed buffer may be recycled.
  4. The Publisher will wait (threadlessly) for a request(1) before call onCompleted() so that it does not try to complete while a previously passed buffer is still being processed.

Hmmm I'll give that a go in my repo tomorrow (1am here now) and report back if that approach solves my issues.

@viktorklang
Copy link
Contributor

@gregw

I'm opportunistically answering your answer instead of your previous comments (let me know if I should)

Actually.... I think I'm about to answer my own question.....

A Processor that receives a ByteBuffer in onNext that is larger than it can process without blocking (do to lack of backpressure) can just keep a reference to it and return from the onNext call... provided that:

  1. It didn't have more than request(1) outstanding, so the Publisher will not call onNext again

If a processor decides to issue more demand upstream than is signalled downstream, it (by design) needs to be willing to buffer that itself. This does not entail any blocking in the "parking a Thread"-sense of the word.

  1. The Publisher understands the contract that just because onNext() returns does not mean that the passed buffer has been fully handled and can be reused.

Exactly. Think of onNext as sending the element asynchronously to the Subscriber, it could technically "live" on another machine. This is why the term signal is used in the spec.

  1. The Publisher will wait (threadlessly) for a subsequent request(1) to inform it that processing of it's last onNext has now completed and the passed buffer may be recycled.

No, this is faulty. request is never to be assumed to be an Ack. As per the spec the Subscriber is free to issue requests whenever it likes. (barring things like cancellation, onComplete/onError etc).

  1. The Publisher will wait (threadlessly) for a request(1) before call onCompleted() so that it does not try to complete while a previously passed buffer is still being processed.

See previous answer.

Hmmm I'll give that a go in my repo tomorrow (1am here now) and report back if that approach solves my issues.

What you'll need to do in this case is to control the pipeline:

YourPublisher -> CustomerProcessor<ByteBuffer, ByteBuffer> -> YourSubscriber

So you can collect the ByteBuffers as they are passed through.
Keep in mind that anything in between could in theory retain a reference to your buffer and mess around with it after they should.

Does that help?
Let me know if there's anything I can clarify further or if I have misunderstood something.

@gregw
Copy link
Author

gregw commented Jun 2, 2015

@viktorklang regarding your comment to @jbrisbin
Consider a GzipInflatorProcessor implements Processor<byte[],byte[]>

It's subscriber calls request(1) on the inflator, so it calls request(1) on it's publisher, which responds by calling inflator.onNext(byte[]) with a 256 byte array.

But it is an inflator, so that 256 byte array can expand to an almost arbitrary large byte[]. There are several possibilities:

  1. inflator just inflates to a single huge byte[] and has unlimited memory commitment.
  2. inflator interates inflating chunks of bytes[] and blocks while waiting for request(n) calls so that it can call onNext with each chunk on its subscriber
  3. inflator returns from onNext without having completely inflated the 256 bytes. The processor knows that it cannot mess with that byte array until it sees a request(n>0) from the inflator (as per my previous comment)

@jbrisbin
Copy link

jbrisbin commented Jun 2, 2015

@gregw FWIW in Reactor we had to implement his "remainder" functionality for our Codec facility. That sounds very similar.

@gregw
Copy link
Author

gregw commented Jun 2, 2015

@viktorklang replying to your message that crossed mine.
See my inflator example to understand why I'm concerned with collecting buffers as they pass through. That can result in an arbitrary memory commitment.

Surely a Processor is a Subscriber, so it is free to without its subsequent request(n) calls until it has finished handling a previous onNext call.

I understand that using a request(1) as an ack is not exactly correct, but I do not see how else a publisher can recycle a buffer it has previously passed to onNext. It's not acceptable to say that it can't recycle it as servers will just not scale if they create that kind of garbage for each message.

So without the request as ack, how else can a Publisher (or Processer) recycle a buffer?

@viktorklang
Copy link
Contributor

@gregw

@viktorklang regarding your comment to @jbrisbin

Consider a GzipInflatorProcessor implements Processor

It's subscriber calls request(1) on the inflator, so it calls request(1) on it's publisher, which responds by calling inflator.onNext(byte[]) with a 256 byte array.

But it is an inflator, so that 256 byte array can expand to an almost arbitrary large byte[]. There are several possibilities:

  1. inflator just inflates to a single huge byte[] and has unlimited memory commitment.
  2. inflator interates inflating chunks of bytes[] and blocks while waiting for request(n) calls so that it can call onNext with each chunk on its subscriber
  3. inflator returns from onNext without having completely inflated the 256 bytes. The processor knows that it cannot mess with that byte array until it sees a request(n>0) from the inflator (as per my previous comment)

I'll try simplify the example:

Let's say that you have a processor that given a T can produce an unbounded number of Ts.

Now, if the routine that possibly creates an unbounded number of Ts doesn't have a means to control the number of Ts it produces, nothing Reactive Streams can do will ever be able to fix that. It is equivalent to a Publisher that cannot be controlled-> you'll have to decide to buffer and drop.

@viktorklang replying to your message that crossed mine.
See my inflator example to understand why I'm concerned with collecting buffers as they pass through. That can result in an arbitrary memory commitment.

Surely a Processor is a Subscriber, so it is free to without its subsequent request(n) calls until it has finished handling a previous onNext call.

Sure, but only if you control the implementation, i.e. you know more about the pipeline than the RS spec can know. I'm talking from the general perspective of what you can and can't assume wr.t. arbitrary Publishers, Subscribers and Processors given the spec and tck.

I understand that using a request(1) as an ack is not exactly correct, but I do not see how else a publisher can recycle a buffer it has previously passed to onNext. It's not acceptable to say that it can't recycle it as servers will just not scale if they create that kind of garbage for each message.

So without the request as ack, how else can a Publisher (or Processer) recycle a buffer?

Safest strategy is to add a collector stage at the end that ferries the buffer back to the publisher.

Does that make sense?

@gregw
Copy link
Author

gregw commented Jun 2, 2015

@viktorklang
I think a collector that ferries the buffer back to the publisher is also dependent on controlling the implementation beyond what the RS spec tells you. You wont be able to develop utility processors as they will not know how to return the buffers passed to them from arbitrary publishers (which may be removed by several processors).

However, it is probably a better solution than the request as ack idea as it at least allows for request(n>1) to be sent to the Publisher to avoid possible latency issues.

I'll experiment further.

@viktorklang
Copy link
Contributor

@gregw

You'll be able to hook in any processor you like as long as you control where the buffers are injected and where they are collected. Or?

Let me know what your experimentation reveals, this is an interesting topic.
Looking forward to see what comes out of it.

@gregw
Copy link
Author

gregw commented Jun 3, 2015

OK I think I'm understanding RS better and will close this issue.

The problem that I was having is that I wanted to implement a Processor that was async, bounded memory and acknowledging (so callers could recycle buffers). I've worked out that you can have only 2 of those 3 attributes. I've been able to implement an async & bounded memory process so long as I don't attempt to use the return of onNext or a subsequent request(n) call to indicate acknowledgement (so that buffers can be recycled or the demo exit after completion).

You can see the results of this revelation here.

While I think acknowledgement is an important attribute, it is not the subject of this issue and I'll move the discussion of it to the list. My demo achieves acknowledgement only by going outside the standard RS API.

Two other things I'll note of interest here: Firstly the demo contains an abstract IteratingProcessor that is based on the Jetty IteratingCallback. This is a useful pattern to avoid arbitrarily deep recursion (onNext(item) calls request(n) calls onNext(item) calls request(n)....) which is a common problem with async frameworks.

The other is that I've noticed that RS are similar to a scheduling strategy that we've implement in Jetty called Eat What You Kill. EWYK achieves mechanical sympathy by implementing the ethos of a thread should not produce a task unless it is able to immediately consume it. This avoids queues, dispatch delays and allows tasks to be consumed with hot tasks.

RS style encourages similar mechanical sympathies by requiring publishers to not publish until there is demands that originates from the ultimate subscriber. This can allow a single thread to call through a chain of onNext calls, keeping it's cache hot while avoiding queues and dispatch delays.

@gregw gregw closed this as completed Jun 3, 2015
@plevart
Copy link

plevart commented Jun 3, 2015

@gregw

Just passing by and looking at your problem. What would be if you copied the 256 bytes buffer when you received it in your GzipInflatorProcessor to a private buffer and then decompress incrementally from the private copy when the downstream requests more. So you don't retain a reference to buffer passed to you. Surely this will mean more memory copying, but that's an alternative if you don't want to manage byte[] arrays ownership. You treat byte[] arrays as value types. Does that make any sense?

@viktorklang
Copy link
Contributor

Hi @gregw,

I can't see any tests in the project—I'd strongly recommend applying the TCK to your processor, it will assist you in ironing out potential spec violations.

Speaking of the spec, I'd love to get your feedback on it, is there things that need improvement, what was good etc?

@gregw
Copy link
Author

gregw commented Jun 3, 2015

@plevart Sure copying data get's around some of the issues and copying 256 bytes in that example is not a big deal. But copying data passed in that can be of arbitrary size is not a good general solution as it is hard to enforce memory bounds.

cheers

@gregw
Copy link
Author

gregw commented Jun 3, 2015

@viktorklang Thanks for the suggestion of applying the TCK. I think it is probably a bit early for our experiments, but we will certainly look at it soon.

With regards to the spec, it is also a bit early for me to say anything too definitive... specially as I think I've just spent the best part of a week barking up the wrong tree with it :) Although I guess that might mean that some of the documentation and examples could be improved.

But now that I've adjusted my head to think more in line with how the API is intended to be used, I can give you some initial feedback. It is definitely a lot simpler to work with than other async APIs, so while I think there is a lot of good in the Servlet async IO API, it is a very tricky API to use and we are very interested in wrapping it as RS's to make it easier to use. That is where our focus has shifted in our experimental repo.

The other thing I like is the mechanical sympathy that results from RS, with a single thread able to call all the way through a chain of onNext calls with a hot CPU cache and no dispatch. The IteratingProcessor model we've implemented should be good at stopping any recursion problems and is much simpler to use than the IteratingCallback we use with our callback model.

However, we still do have some concerns with the lack of acknowledgement, so a publisher does not know if it can recycle an item, can't get exceptions in some circumstances and doesn't know if an onCompleted() has progressed to completion all the way through a chain.

But give us a few more days to get our heads more into it and also try out some more real use cases, perhaps with some other implementations. But so far its pretty good!

cheers

@viktorklang
Copy link
Contributor

@gregw

I assume you saw these? https://github.com/reactive-streams/reactive-streams-jvm/tree/master/examples/src/main/java/org/reactivestreams/example/unicast

You may want to inspect other implementations (implementing it is non-trivial just because the problem has inherent complexity) but the end-user will not be implementing it themselves but use an implementation. It may also be interesting for you to leverage an already existing implementation to prototype with.

Looking forward to your feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants