Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlled poll from queue #97

Closed
benwimpory opened this issue May 5, 2017 · 18 comments
Closed

Controlled poll from queue #97

benwimpory opened this issue May 5, 2017 · 18 comments

Comments

@benwimpory
Copy link

More of a question than an issue...how would I go about using an Agrona queue's element's contents to control consumption ?

The idea is to offer items to a queue to preserve their order, kick off async enrichment of the items and then only consume once enrichments have completed. I'd use a putOrderedObject to issue a store-store fence for the enrichments to ensure they were visible on the poll.

I suspect there is a more elegant way to approach this problem, but the extension to OneToOne queue below seems to work. Thoughts/improvements/suggestions appreciated.

public E poll(Predicate<E> predicate) {
        final Object[] buffer = this.buffer;
        final long currentHead = head;
        final long elementOffset = sequenceToBufferOffset(currentHead, capacity - 1);
        final Object e = UNSAFE.getObjectVolatile(buffer, elementOffset);
        if (null != e && predicate.test((E) e))
        {
            UNSAFE.putOrderedObject(buffer, elementOffset, null);
            UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead + 1);
            return (E)e;
        }
        return null;
    }
@tmontgomery
Copy link
Contributor

It might be better to think of it how Aeron's logbuffer controlled poll works.

https://github.com/real-logic/Aeron/blob/master/aeron-client/src/main/java/io/aeron/Image.java#L268

The key being, think of it more as a drain than a poll. And think of the return from the handler being ABORT, BREAK, or COMMIT. This gives a great amount of control with only a little bit more work.

@benwimpory
Copy link
Author

Agree thinking of this as a drain makes much more sense.

The draft code below works, but as per the TODO, I am wondering under what circumstances you would need the ABORT given the 'head' is only advanced on COMMIT ?

This is vs Aeron's Image class which needs to undo fragmentsRead and resultingOffset when reading an element goes wrong.

public int controlledPoll(final ControlledElementHandler<E> elementHandler, final int elementLimit)
    {
        final Object[] buffer = this.buffer;
        final long startHead = head;
        long currentHead = head;

        try
        {
            do
            {
                final long elementOffset = sequenceToBufferOffset(currentHead, capacity - 1);
                final Object e = UNSAFE.getObjectVolatile(buffer, elementOffset);

                if (null == e)
                {
                    return 0;
                }

                ControlledElementHandler.Action action = elementHandler.onElement((E) e);

                if (action == BREAK)
                {
                    break;
                }
                else if (action == ABORT)
                {
                    // TODO Given currentHead only advanced on commit, do we need ABORT ?
                }
                else if (action == COMMIT)
                {
                    UNSAFE.putOrderedObject(buffer, elementOffset, null);
                    
                    // Update position (locally and to publisher thread)
                    currentHead += 1;
                    UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead);
                }

            }
            while ((currentHead - startHead) < elementLimit);
        }
        catch (final Throwable t)
        {
            // TODO Add error handler
            //errorHandler.onError(t);
        }

        return (int) (currentHead - startHead);
    }

@tmontgomery
Copy link
Contributor

tmontgomery commented May 8, 2017

BREAK and ABORT are different in terms of advancement. One advances. The other does not.

Here is a breakdown such as the ControlledFragmentHandler uses it.

  • ABORT the current polling/drain operation and do not advance for this element.
  • BREAK from the current polling/drain operation and commit the advance as of the end of the current element being handled.
  • COMMIT continue processing and advance to next element.

Also, there is one more option to consider. Not sure how I feel about it to be honest....

  • CONTINUE processing until element limit or no elements exist with implied commit at end of poll/drain.

This is somewhat like a skip ahead.

@mjpt777 and @RichardWarburton thoughts?

@mjpt777
Copy link
Contributor

mjpt777 commented May 9, 2017

CONTINUE is an example of natural/smart batching, i.e. update the position on completion of the batch but continue processing. It is not skip ahead like controlledPeek on Image.

You need to update current head in a finally block to handle exceptions.

@tmontgomery
Copy link
Contributor

Sorry. Poor choice of words. Was thinking of "skip" meaning go ahead and deliver everything. Not to skip delivery.

@benwimpory
Copy link
Author

Thanks for the responses from @tmontgomery and @mjpt777 on this thread :-)

I've updated the method following your feedback.

  • COMMIT keeps advance in position, immediately commits
  • BREAK keeps advance in position, stops processing further elements
  • ABORT rolls back position advance, stops processing elements
  • CONTINUE keeps the position advance, defers commit until finally

As per @mjpt777, the currentHead is now published in the finally block so all successful advancements are published even if there is an exception in the element handler.

The previous version also did a return 0 if the element was null. It now breaks the loop so any prior advances are published.

Let me know if the below looks better. I'm happy to do a PR if you want this added to core library.

    public int controlledPoll(final ControlledElementHandler<E> elementHandler, final int elementLimit)
    {
        final Object[] buffer = this.buffer;
        final long startHead = head;
        long currentHead = head;

        try
        {
            do
            {
                final long elementOffset = sequenceToBufferOffset(currentHead, capacity - 1);
                final Object e = UNSAFE.getObjectVolatile(buffer, elementOffset);

                if (null == e)
                {
                    break;
                }
                
                ControlledElementHandler.Action action = elementHandler.onElement((E) e);
                currentHead += 1;

                if (action == BREAK)
                {
                    UNSAFE.putOrderedObject(buffer, elementOffset, null);
                    break;
                }
                else if (action == ABORT)
                {
                    currentHead -= 1;
                    break;
                }
                else if (action == COMMIT)
                {
                    UNSAFE.putOrderedObject(buffer, elementOffset, null);
                    // Commit head immediately
                    UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead);
                }
                else if (action == CONTINUE)
                {
                    UNSAFE.putOrderedObject(buffer, elementOffset, null);
                }
            }
            while ((currentHead - startHead) < elementLimit);
        }
        catch (final Throwable t)
        {
            // TODO Add error handler
            //errorHandler.onError(t);
        }
        finally
        {
            // Commit position
            UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead);
        }
        return (int) (currentHead - startHead);
    }

@mjpt777
Copy link
Contributor

mjpt777 commented May 9, 2017

Checking the action with a switch statement would allow to be more elegantly expressed and to make it more efficient.

@benwimpory
Copy link
Author

benwimpory commented May 9, 2017

Introducing a switch-case means introducing a loop variable for the do-while as we still need to break out of the case statements.

It's interesting that the JVM doesn't optimise if-elseif on the same variable these days. I'll have to dump the bytecode sometime to see the difference.

Latest version...

    public int controlledPoll(final ControlledElementHandler<E> elementHandler, final int elementLimit)
    {
        final Object[] buffer = this.buffer;
        final long startHead = head;
        long currentHead = head;

        try
        {
            loop:
            do
            {
                final long elementOffset = sequenceToBufferOffset(currentHead, capacity - 1);
                final Object e = UNSAFE.getObjectVolatile(buffer, elementOffset);

                if (null == e)
                {
                    break;
                }

                ControlledElementHandler.Action action = elementHandler.onElement((E) e);
                currentHead += 1;

                switch (action) {
                    case BREAK:
                    {
                        UNSAFE.putOrderedObject(buffer, elementOffset, null);
                        break loop;
                    }
                    case ABORT:
                    {
                        currentHead -= 1;
                        break loop;
                    }
                    case COMMIT:
                    {
                        UNSAFE.putOrderedObject(buffer, elementOffset, null);
                        // Commit head immediately
                        UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead);
                        break;
                    }
                    case CONTINUE:
                    {
                        UNSAFE.putOrderedObject(buffer, elementOffset, null);
                        break;
                    }
                }
            }
            while ((currentHead - startHead) < elementLimit);
        }
        catch (final Throwable t)
        {
            // TODO Add error handler
            //errorHandler.onError(t);
        }
        finally
        {
            // Commit position
            UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead);
        }
        return (int) (currentHead - startHead);
    }

@mjpt777
Copy link
Contributor

mjpt777 commented May 10, 2017

I'm not sure this is correct. It would be better to just return after having set up the appropriate state for the finally clause to record in either the ABORT or BREAK.

@benwimpory
Copy link
Author

Yes, you are right.

  • Changed method name to controlledDrain()
  • removed long startHead and replaced with int elementsRead
  • removed use of break loop; and replaced with return elementsRead
  • factored out (1) nulling of elements and (2) commit position into private methods.
  • added errorHandler as class level field.

Are there any concerns about elementLimit being large and preventing the method from returning ? My thoughts were that you should eventually hit a BREAK or ABORT and that callers should be mindful about the risks of calling with a large elementLimit.

Thanks again for all the feedback.

@SuppressWarnings("unchecked")
public int controlledDrain(final ControlledElementHandler<E> elementHandler, final int elementLimit)
{
    final Object[] buffer = this.buffer;
    long currentHead = head;
    int elementsRead = 0;

    try
    {
        do
        {
            final long elementOffset = sequenceToBufferOffset(currentHead, capacity - 1);
            final Object e = UNSAFE.getObjectVolatile(buffer, elementOffset);

            if (null == e)
            {
                return elementsRead;
            }

            final ControlledElementHandler.Action action = elementHandler.onElement((E) e);
            currentHead += 1;
            elementsRead += 1;

            switch (action)
            {
                case BREAK:
                {
                    clearElement(buffer, elementOffset);
                    return elementsRead;
                }
                case ABORT:
                {
                    currentHead -= 1;
                    elementsRead -= 1;
                    return elementsRead;
                }
                case COMMIT:
                {
                    clearElement(buffer, elementOffset);
                    updatePosition(currentHead);
                    break;
                }
                case CONTINUE:
                {
                    clearElement(buffer, elementOffset);
                    break;
                }
            }
        }
        while (elementsRead < elementLimit);
    }
    catch (final Throwable t)
    {
        errorHandler.onError(t);
    }
    finally
    {
        updatePosition(currentHead);
    }
    return elementsRead;
}

private void updatePosition(long currentHead) {
    UNSAFE.putOrderedLong(this, HEAD_OFFSET, currentHead);
}

private void clearElement(Object[] buffer, long elementOffset) {
    UNSAFE.putOrderedObject(buffer, elementOffset, null);
}

@mjpt777
Copy link
Contributor

mjpt777 commented May 12, 2017

Looks better. What about padding records?

@benwimpory
Copy link
Author

Apologies for late response to this question.

I confess I am slightly confused by what you mean by "padding records". I always think of padding as filling up a buffer or data structure to a known size.

I did wonder if you meant exiting a long-running drain loop due to a large elementLimit by changing the publisher to periodically offer a padding record which forces the element handler to return a BREAK.

Could you clarify ?

@mjpt777
Copy link
Contributor

mjpt777 commented May 24, 2017

If a record does not fit between the current position and the end of the buffer then a padding record is inserted and the producers record is added at the beginning of the buffer as it wraps round.

@benwimpory
Copy link
Author

Thanks for clarification.

I have been extending the OneToOneArrayQueue which I don't believe adds padding records. I'll check the code tomorrow to be sure.

Are you referring to the ring buffers ?

@mjpt777
Copy link
Contributor

mjpt777 commented May 24, 2017

Yes, the RingBuffers add padding records when a wrap occurs and the inserted record does not fit. Look at the other code.

https://github.com/real-logic/Agrona/blob/master/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java#L118

https://github.com/real-logic/Agrona/blob/master/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java#L360

In both cases the read also needs to account for the padding record.

@benwimpory
Copy link
Author

Apologies for lack of response on this. I will come back on this thread with the RingBuffer controlled drain extension, but probably not until mid-August. Are you ok to keep this issue open ?

@mjpt777
Copy link
Contributor

mjpt777 commented Jul 12, 2017

No rush this end.

@mjpt777
Copy link
Contributor

mjpt777 commented Jan 24, 2018

I'll close this issue due to no progress. It can be reopened if further progress is made.

@mjpt777 mjpt777 closed this as completed Jan 24, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants