Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] allow processing messages in batches #43354

Merged
merged 1 commit into from Oct 30, 2021

Conversation

nicolas-grekas
Copy link
Member

@nicolas-grekas nicolas-grekas commented Oct 6, 2021

Q A
Branch? 5.4
Bug fix? no
New feature? yes
Deprecations? no
Tickets #36910
License MIT
Doc PR -

This replaces #42873 as it proposes an alternative approach to handling messages in batch.

BatchHandlerInterface says it all: if a handler implements this interface, then it should expect a new $ack optional argument to be provided when __invoke() is called. When $ack is not provided, __invoke() is expected to handle the message synchronously as usual. But when $ack is provided, __invoke() is expected to buffer the message and its $ack function, and to return the number of pending messages in the batch.

Batch handlers are responsible for deciding when they flush their buffers, calling the $ack functions while doing so.

Best reviewed ignoring whitespaces.

Here is what a batch handler might look like:

class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, Acknowledger $ack = null)
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                // [...] compute $result from $message
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }
}

The size of the batch is controlled by BatchHandlerTrait::shouldFlush() (defaults to 10).

The transport is acknowledged in batch, after the bus returned from dispatching (unlike what is done in #42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully.

By default, pending batches are flushed when the worker is idle and when it stops.

@lyrixx
Copy link
Member

lyrixx commented Oct 6, 2021

Could you add a full example?

@nicolas-grekas
Copy link
Member Author

nicolas-grekas commented Oct 15, 2021

This PR is ready, tests included 🎉

@fabpot
Copy link
Member

fabpot commented Oct 19, 2021

@lyrixx I think you were very interested in this feature :)

@chalasr
Copy link
Member

chalasr commented Oct 19, 2021

This misses a CHANGELOG entry

@nicolas-grekas
Copy link
Member Author

CHANGELOG added

Copy link
Member

@chalasr chalasr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yched
Copy link

yched commented Oct 19, 2021

I'm super interested in this feature as well, the PR looks great, I'm just wondering if it will allow for "grab whatever pending messages exist and process them in a batch (N max, but don't wait for more if there are less than N)" ? Will the --time-limit worker option trigger a batch flush ?

@nicolas-grekas
Copy link
Member Author

don't wait for more if there are less than N

Yes, PR just updated. Batches will be flushed when the worker is idle or when it is stopped.

@nicolas-grekas
Copy link
Member Author

PR updated to inform handlers if flushing is called while the worker is idle (in which case the handler can decide not to flush) or if the flush happens while the worker is stopping.

Copy link
Member

@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I let some comments, but I did not finished to review

src/Symfony/Component/Messenger/Handler/Acknowledger.php Outdated Show resolved Hide resolved
Comment on lines 43 to 51
$result = null !== $ack ? 0 : null;
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))];

if (null !== $result && !$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);

return $result ?? $ack->getResult();
Copy link
Member

@lyrixx lyrixx Oct 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like is more readable

Suggested change
$result = null !== $ack ? 0 : null;
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))];
if (null !== $result && !$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);
return $result ?? $ack->getResult();
if ($ack === null) {
$ack = new Acknowledger(get_debug_type($this))
$this->jobs[] = [$this->schedule($message), $ack];
$this->flush(true);
return $ack->getResult();
}
$this->jobs[] = [$this->schedule($message), $ack];
if (!$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);
return 0;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the code when $ack === null seems broken: You create a new Acknowledger, so the $ack argument in it is null, so it's an empty callable, so it never really ack on the transport?

I think this part can be simplified

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patch applied thanks.

This logic is the answer to your question:

as soon as I implement the interface, the handler is forced to be used in batch mode?

when $ack is null, HandleMessageMiddleware will not add the NoAutoAckStamp, and the worker will ack as usual. The batch handler is thus compatible with non-batch mode.

@lyrixx
Copy link
Member

lyrixx commented Oct 23, 2021

One question: as soon as I implement the interface, the handler is forced to be used in batch mode?

Copy link
Member

@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, But I let some comments for the public API.
Then, we could merge it, and I'll have to play with it on a real project to see how it behaves.

thanks for the hard work

Comment on lines 44 to 46
$ack = new Acknowledger(get_debug_type($this));
$this->jobs[] = [$this->schedule($message), $ack];
$this->flush(true);
Copy link
Member

@lyrixx lyrixx Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still want to simplify this PR and I think something could be done here.

This trait is named BatchHandlerTrait so its purpose is to handle batches of messages. I understand that you want to preserve the contact in the HandlerInterface. But this very method must not allow to pass $ack=null.

So basically, If one wants to use the handler in a batch way and also in a non batch way (which is not possible BTW), it does not make sens when $ack === null (in the $handler::__invoke()) to call handle().

So this trait must throw an exception when $ack is null. And it's up to the developer to check if $ack is null or not, and if it's null they must not call this method.

Finally, all cases where $ack === null could be removed (here, and in Acknowledger).

  • the code will be simpler, and easier to read / lean
  • the contract of HandlerInterface::__invoke($message) (where $ack = null) is preserved
  • I'm happy

And by doing so, we can renamed handle() (which is very generic and not really intuitive) to schedule(). (see my comment on the current schedule method).


I really like the following code / API:

class Handler implements BatchHandlerInterace
{
  public function __invoke(Message $message, $ack)
  {
    if ($ack) {
      return $this->schedule(new Foobar($message), $ack);
    }
    // sync processing
  }
}

Don't you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with throwing. The handler should be usable in non-batch mode per LSP and throwing would break that. The current design makes it a no brainer: implement process() and no need to care about this concern. Your proposal would require ppl to write extra logic if they want to follow LSP. But LSP is not an option to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with throwing. The handler should be usable in non-batch mode per LSP and throwing would break that.

Yes, I totally agree with you on that point

The current design makes it a no brainer

I disagree with you. You may think it's easy, but people read code under the hood. and ATM, the code is not simple.

Your proposal would require ppl to write extra logic

Yes one more line of code. Explicit is better than implicit

But LSP is not an option to me.

I don't understand why LSP may be broken. Can you elaborate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boilerplate to implement both modes is in the trait. That's how it should be. Not doing so means requiring every consumer of the trait to add the same boring logic. That'd fail the purpose of the trait, and more critically, that'll break LSP when ppl won't implement the "else" in your snippet. Since we know ppl are lazy, or make mistakes, this is a recipe for more bugs.

About LSP, the principle is not about php interfaces but about abstractions that should be swappable. If an implementation fails to implement non-batch mode, eg by throwing, then it's not swappable and LSP is broken.

@nicolas-grekas
Copy link
Member Author

@lyrixx thanks for the review, I should have addressed your comments.

@fabpot
Copy link
Member

fabpot commented Oct 30, 2021

Thank you @nicolas-grekas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants