Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Rate limiting message handling in Symfony Messenger #50465

Open
dejagersh opened this issue May 29, 2023 · 9 comments
Open

[RFC] Rate limiting message handling in Symfony Messenger #50465

dejagersh opened this issue May 29, 2023 · 9 comments
Labels
Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed) Status: Waiting feedback

Comments

@dejagersh
Copy link

dejagersh commented May 29, 2023

Description

This proposal introduces a feature into Symfony Messenger which allows for messages to be rate limited, similar to Laravel's RateLimited middleware: https://laravel.com/docs/10.x/queues#rate-limiting

In Laravel you define your limiter as such:

RateLimiter::for('backups', function (object $job) {
    return Limit::perHour(1);
});
class CreateBackup implements ShouldQueue
{
    //

    public function middleware(): array
    {
        return [new RateLimited('backups')];
    }
}

When you dispatch e.g. 10 CreateBackup jobs, it will immediately run the first job. The next 9 jobs will be re-dispatched and made available at T + 1 hour, limiting job executions to 1 per hour.

Our specific use case

In our app our jobs interact a lot with external APIs, specifically to pull data for our clients. For one client there are might be thousands of such jobs in the system. The external API has rate limits in place which we must adhere to. For each client we have an API key for that external API, and rate limits are defined per API key. I would like to be able to elegantly define these rate limits for our messages and redispatch messages if rate limits are (about to be) exceeded.

Proposal

I've tried to implement this myself using a Messenger middleware. It's a rough draft, but this works alright:

class RateLimitMiddleware implements MiddlewareInterface
{
    public function __construct(private RateLimiterFactory $anonymousApiLimiter, private MessageBusInterface $messageBus, private LoggerInterface $logger)
    {

    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        if (false === $envelope->getMessage() instanceof ShouldBeRateLimited || !$envelope->last(ReceivedStamp::class)) {
            return $stack->next()->handle($envelope, $stack);
        }

        /** @var ShouldBeRateLimited $message */
        $message = $envelope->getMessage();

        $limiter = $this->anonymousApiLimiter->create($message->rateLimitKey());

        $limit = $limiter->consume(1);

        if (false === $limit->isAccepted()) {
            $this->logger->info('Rate limit exceeded', [
                'retry-after' => $limit->getRetryAfter()->getTimestamp(),
                'limit' => $limit->getLimit(),
            ]);

            $this->messageBus->dispatch(
                (new Envelope($message))->with(DelayStamp::delayUntil($limit->getRetryAfter()))
            );

            return $envelope;
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

ShouldBeRateLimited looks like this:

interface ShouldBeRateLimited
{
    public function rateLimitKey(): string;
}

So a message might look like this:

class ProcessPodcast implements ShouldBeRateLimited
{
     private $podcastId;

     public function __construct(string $podcastId)
     {
         $this->podcastId = $podcastId;
     }

    public function getPodcastId(): string
    {
        return $this->podcastId;
    }

    public function rateLimitKey(): string
    {
        return 'process_podcast:' . $this->getPodcastId();
    }
}

Known issue with this implementation: rate limiter factory can't be customised per message.

Alternative solutions to the same problem

Putting that logic in the handler

public function __invoke(ProcessPodcast $message)
{
    $limit = $this->anonymousApiLimiter->create('process-podcast:' . $message->getPodcastId())->consume(1);

    if (!$limit->isAccepted()) {
        $this->logger->info('Rate limit exceeded', [
            'retry-after' => $limit->getRetryAfter()->getTimestamp(),
            'limit' => $limit->getLimit(),
        ]);

        $this->messageBus->dispatch((new Envelope($message))->with(DelayStamp::delayUntil($limit->getRetryAfter())));

        return;
    }

    // do process podcast logic
}

This works alright, except it gets really tedious when you have to add this to many message handlers, especially when you do more than rate limiting, such as: preventing overlapping jobs, checking circuit breakers etc...

@carsonbot carsonbot added the RFC RFC = Request For Comments (proposals about features that you want to be discussed) label May 29, 2023
@bobvandevijver
Copy link
Contributor

I see my documentation MR (symfony/symfony-docs#17203) has never been merged, but there is already a rate limiter option for a messenger worker. Just configure the rate_limiter option on your transport and you're done. Wouldn't this be enough for the use case you describe?

@akapacinskas
Copy link

akapacinskas commented Jun 6, 2023

I see my documentation MR (symfony/symfony-docs#17203) has never been merged, but there is already a rate limiter option for a messenger worker. Just configure the rate_limiter option on your transport and you're done. Wouldn't this be enough for the use case you describe?

What I'am missing in your example, is the rate limit key. In @dejagersh example you can limit per customer id, or any id if you want. And in your example also it blocks whole transport if I'am not mistaken?

@bobvandevijver
Copy link
Contributor

That is indeed true, the existing implementation is a rate limiter based on the amount of messages on the transport and blocks the complete worker.

@pounard
Copy link
Contributor

pounard commented Jun 7, 2023

It's weird to rate limit at the bus level, wouldn't it be better to plug rate limiting in at the user facing endpoint instead ?

From my point of view, the bus is not supposed to know anything about user who sent the message whereas in your user facing endpoint, you can identify your client. For example, if the endpoint is an HTTP REST endpoint that relays messages to the bus, then access control in general, rate limiting being some kind of access control should probably be done in the controller ?

Bus can have some form of authentication or such security features, but they rather would be like the login/password for a database, it authenticates the connection hence the application being plug in, not the final user which belongs to your domain that the bus should probably remain agnostic of.

@CRC-Mismatch
Copy link
Contributor

CRC-Mismatch commented Jun 9, 2023

I think the whole problem is not at a consumer, much less at a sender level... The point of rate limiting is to not "overwork" consumers with one queue when you have a specific reason to avoid it. IMHO the only plausible scenarios for this would be a) when the consumer is working multiple different queues or b) when one queue may carry "light" and "heavy loads" at the same time, and you need to prioritize either of them... If your only reason to "rate-limit" is to schedule some processing, you're better off with a combination of "cronjobs" and messaging. Other than that, and thinking on these plausible scenarios, I don't see either blocking the consumer process nor blocking the sender as a solution. This goes against the whole idea of message-queues and asynchronous processing. I think this somewhat relates to the reason why Symfony has no "cronjob" concept - it remains fully platform-agnostic, and if you really need to schedule something, you can always set up your platform's scheduler to call a Symfony command. Infrastructure code should never be intertwined with application code if you don't want to become dependant on a specific platform/stack. There's also another plausible scenario where one wants to avoid huge processing-time-based cloud-provider bills, and for that one, a simple "this consumer will only consume X messages every Y {time-units}" should suffice, but still, if we're talking FaaS, as long as the consumer is running, it will incur costs, even if it's just idling.

I think the correct approach would be to implement something using stamps that first tries to work with the specific queue backend in order to avoid blocking at all (scheduled messages or something along those lines), and if that's not possible, as a last resort would skip those messages until their time is due. With the latter, I can foresee a somewhat considerable problem when dealing with messaging services that wait for a success acknowledgement until some timeout and then throw the original message back into the queue - the messages would keep coming back until the backend's set deadline, when they would be discarded, which creates a dependency on the backend's configuration.

I see how this could work if we had something akin to the Cache component's "TagAwareCacheInterface", where even some backends that don't have optimal support for it still get code-side support. Something like "unacknowledging the message if it's not due yet" (which could mean resending it to the queue with a refreshed timestamp or simply skipping it altogether if the queue doesn't support deadlines) could work.

Otherwise, if someone needs to avoid hitting some external consumer's limits, taking all the variables created by the aforementioned scenarios into account, I'd think it'd be really upon the application developer to modify and implement on a project-basis to fit those requirements, unless we could find a really agnostic way to do it without blocking on any side.

@dejagersh
Copy link
Author

dejagersh commented Jul 25, 2023

It's weird to rate limit at the bus level, wouldn't it be better to plug rate limiting in at the user facing endpoint instead ?

From my point of view, the bus is not supposed to know anything about user who sent the message whereas in your user facing endpoint, you can identify your client. For example, if the endpoint is an HTTP REST endpoint that relays messages to the bus, then access control in general, rate limiting being some kind of access control should probably be done in the controller ?

Bus can have some form of authentication or such security features, but they rather would be like the login/password for a database, it authenticates the connection hence the application being plug in, not the final user which belongs to your domain that the bus should probably remain agnostic of.

I am not completely sure what you mean, but I am not looking to rate limit customer behaviour or something, if that's what you mean. Our scheduler dispatches messages periodically and the handlers speak to external API endpoints. None of this is triggered by a user.

I'll give an example closer to our real use case. It's obviously not actual code copied from the codebase but I hope it gets the point across.

scheduler:

$this->scheduler->everyMorning(function () {
    foreach ($accounts as $account) {
        $this->messageBus->dispatch(new ImportDataMessage($account));
    }
});
class ImportDataMessageHandler
{
    public function __invoke(ImportDataMessage $message)
    {
        $response = $this->httpClient->request('GET', 'https://rate-limited-api.com/'); // <-- Has rate limits and we're expected to respect them.

        $this->importData($response, $message->account);
    }
}

@dkarlovi
Copy link
Contributor

dkarlovi commented Sep 13, 2023

In our app our jobs interact a lot with external APIs, specifically to pull data for our clients. For one client there are might be thousands of such jobs in the system. The external API has rate limits in place which we must adhere to. For each client we have an API key for that external API, and rate limits are defined per API key. I would like to be able to elegantly define these rate limits for our messages and redispatch messages if rate limits are (about to be) exceeded.

This is exactly our use case and (IMO) the major thing is not knowing when you'll be rate limited (since it's per key), what we did is this

use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;

final class RateLimitException extends \RuntimeException implements RecoverableExceptionInterface
{
    public function __construct(public \DateTimeInterface $retryAfter)
    {
        parent::__construct(sprintf('API rate limit exceeded, retry in %1$d seconds', $this->retryAfter->getTimestamp() - (new \DateTimeImmutable())->getTimestamp()));
    }

    public static function fromTimestamp(int $timestamp): self
    {
        return new self((new \DateTimeImmutable())->setTimestamp(time() + $timestamp));
    }
}

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;

final readonly class TimestampDelayRetryStrategy implements RetryStrategyInterface
{
    public function __construct(private RetryStrategyInterface $retryStrategy)
    {
    }

    public function isRetryable(Envelope $message, ?\Throwable $throwable = null): bool
    {
        if ($throwable !== null && $this->unwrap($throwable) instanceof RateLimitException) {
            return true;
        }

        return $this->retryStrategy->isRetryable($message, $throwable);
    }

    public function getWaitingTime(Envelope $message, ?\Throwable $throwable = null): int
    {
        if ($throwable !== null && ($delayed = $this->unwrap($throwable)) instanceof RateLimitException) {
            $delay = $delayed->retryAfter->getTimestamp() - (new \DateTimeImmutable())->getTimestamp();

            // convert seconds to milliseconds
            return $delay * 1000;
        }

        return $this->retryStrategy->getWaitingTime($message, $throwable);
    }

    private function unwrap(\Throwable $throwable): \Throwable
    {
        return $throwable->getPrevious() ?? $throwable;
    }
}

and then

framework:
    messenger:
        transports:
            async:
                retry_strategy:
                    service: TimestampDelayRetryStrategy

Handlers will throw RateLimitException and include the datetime when the rate limit is expected to be completed, the message is then postponed for a later time. This is nice because figuring out when the rate limit will expire can be super-specific per handler, it's contextual and not one size fits all.

What's missing is a good way to reschedule messages which are not yet rate limited, but would be if they were to go through.

What I mean is

  1. you're processing 100 GH related messages
  2. GH rate limits you after 50, says wait 10 minutes
  3. you still have 50 messages which will immediately get rate limited, a nice optimization would be to immediately delay them without triggering GH API and forcing them to rate limit each one

This seems like it could also be solved by your middleware approach where I say "this message will be triggering GH-API-key-hash" and the middleware immediately delays them without triggering the GH API rate limit response at all.

Combining these approaches into a single solution doesn't seem too complex and would solve actual problems.

@ryckblick
Copy link

I'm not a 100% sure, but I think the intended goal can be achived by using a rate limiter (https://symfony.com/doc/current/rate_limiter.html#configuration) for your transport as described here: https://symfony.com/doc/current/messenger.html#rate-limited-transport

@nesl247
Copy link

nesl247 commented Dec 24, 2023

I'm not a 100% sure, but I think the intended goal can be achived by using a rate limiter (https://symfony.com/doc/current/rate_limiter.html#configuration) for your transport as described here: https://symfony.com/doc/current/messenger.html#rate-limited-transport

That is at the transport level and not configurable using message specific contexts. The request is actually about being able to limit messages for a specific customer within a specific transport for example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed) Status: Waiting feedback
Projects
None yet
Development

No branches or pull requests

10 participants