Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Add a redis transport #28681

Closed
wants to merge 1 commit into from

Conversation

@soyuka
Copy link
Contributor

soyuka commented Oct 2, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? not yet added!
Fixed tickets n/a
License MIT
Doc PR TODO symfony/symfony-docs#...

Hi!

This patch adds a redis transport to the Messenger component by using the official php redis-ext (https://github.com/phpredis/phpredis). It's a port of https://github.com/soyuka/symfony-messenger-redis to be included directly in symfony.

How it works

Relevant discussion: https://twitter.com/jderusse/status/980768426116485122

The sender uses a List with RPUSH (add value to the tail of the list).
The receiver uses BRPOPLPUSH which reads the last element of the list and adds in to the head of another list (queue_processing). If no elements are present it'll block the connection until a new element shows up or the timeout is reached. When timeout is reached it works like a "ping" of some sort (calls $handler(null)).

On every iteration, we will check the queue_processing list. For every items in this queue we have a corresponding key in redis with a given ttl.
If the key has expired, the item is LREM (removed) from queue_processing and put back in the origin queue to be processed again. This workaround helps to avoid loosing messages.

Difference with AmqpExt

I'm proposing this feature as a Work In Progress because I'm not sure how I should handle the queue.
Indeed, in AmqpExt the queue is tight to the Connection whereas here, a queue should be linked to a message.
It can work by using the same queue for different messages but I'd not advise to do this because it's messy if you need to maintain your queues afterwards (for example if you want to remove only the queue for message X you should just remove queue X).

What do you think? I can make the queue resilient inside the Connection class so that the code is closer to AMQPExt\Connection (means opening 1 connection per messages) or keep the queue inside the Receiver/Sender (one connection only).

When we find an agreement about this I'll add some tests and remove my WIP flag.

Thanks!

@nicolas-grekas nicolas-grekas added this to the next milestone Oct 2, 2018
@soyuka soyuka force-pushed the soyuka:redis-messenger branch from 8441cb2 to 47f7ca4 Oct 2, 2018
@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Oct 2, 2018

@soyuka This isn't necessarily directed at you, but I think just re-queing failed items or items that have been left in processing too long isn't a very safe way to go about this.

In my experience, when a message/task fails, it almost always fails again when re-run, and you end up just having 6 failed attempts instead of the one. And in some cases, if your system is connecting to external systems, it could end up causing unwanted duplicate data.

Not saying that retrying isn't a useful feature, but I think it should be something that should at least have the option to be turned off or maybe put into a failed jobs log or table.

* Takes last element (tail) of the list and add it to the processing queue (head - blocking)
* Also sets a key with TTL that will be checked by the `doCheck` method.
*/
public function waitAndGet(string $queue, int $processingTtl = 10000, int $blockingTimeout = 1000): ?array

This comment has been minimized.

Copy link
@ragboyjr

ragboyjr Oct 2, 2018

Contributor

I actually made my own redis receiver for my company and found out that you can't use a blocking timeout over 60 seconds. It looks like redis ext has a bug with blocking timeouts around 60s and will eventually throw an exception with bad data sent over the network. I've stress tested this type of redis queue processor with a blocking timeout and never had any issues with 30s.

This comment has been minimized.

Copy link
@soyuka

soyuka Oct 2, 2018

Author Contributor

Cool, are you suggesting that we should add some kind of test to ensure that the ttl is < 60 seconds? Do you set the \Redis::OPT_READ_TIMEOUT to -1?

Oh I see it's configurable in your bundle, I propose that we keep this value configurable but with a guard so that it doesn't exceed 60s.

This comment has been minimized.

Copy link
@ragboyjr

ragboyjr Oct 2, 2018

Contributor

Y, I'm not entirely sure where that threshold lies. Is it 55? 56? it's like right around 60 seconds when it starts to happen, YMMV. So not sure what's the best way to warn the user via an exception other than give them a verbal warning when reading the docs.

@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Oct 2, 2018

https://github.com/krakphp/symfony-messenger-redis Here is the redis bundle I made and I admittedly copied several aspects of the redis part between the https://github.com/krakphp/job and your bundle.

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Oct 2, 2018

but I think it should be something that should at least have the option to be turned off or maybe put into a failed jobs log or table.

Definitely!

I see that in your bundle you're using one queue per connection, therefore you are storing different kinds of messages on the same queue. Isn't this an issue while debugging or maintaining these lists? Do you have an opinion about this?
Thanks for the early comments!

@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Oct 2, 2018

@soyuka Well no because it's more like one queue per transport.

framework:
  messenger:
    transports:
      main:
        dsn: '%env(TRANSPORT_DSN)%'
        options:
          queue: queue_main
      secondary:
        dsn: '%env(TRANSPORT_DSN)%'
        options:
          queue: queue_secondary
    routing:
      App\MainMessage: main
      App\SecondaryMessage: secondary

That would be an example configuration. This definitely allows multiple types of messages for a single queue/transport, but that just depends on how you want to scale out your transports. Some not so frequent messages can go on the main queue, but maybe a heavy traffic message might go on it's own so it can have a dedicated consumer and not get blocked.

@soyuka soyuka force-pushed the soyuka:redis-messenger branch 2 times, most recently from 9d1eed3 to 7bf5a42 Oct 3, 2018
@soyuka soyuka changed the title [Messenger] WIP: Add a redis transport [Messenger] Add a redis transport Oct 3, 2018
@soyuka soyuka force-pushed the soyuka:redis-messenger branch from 7bf5a42 to a7bfa4b Oct 3, 2018
@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Oct 3, 2018

@soyuka This isn't necessarily directed at you, but I think just re-queing failed items or items that have been left in processing too long isn't a very safe way to go about this.

I've decided to follow @sroze implementation for Amqp which states that:

If something goes wrong while consuming and handling a message from the Redis broker, there are two choices: rejecting or re-queuing the message.
If the exception that is thrown by the bus while dispatching the message implements this interface (RejectMessageExceptionInterface), the message will be rejected. Otherwise, it will be re-queued.

$connectionCredentials = array(
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
);

This comment has been minimized.

Copy link
@ragboyjr

ragboyjr Oct 3, 2018

Contributor

Maybe we could also support redis db here as well?

This comment has been minimized.

Copy link
@soyuka

soyuka Oct 8, 2018

Author Contributor

Connect doesn't seem to support the db though: https://github.com/phpredis/phpredis#connect-open

This comment has been minimized.

Copy link
@CvekCoding

This comment has been minimized.

Copy link
@CvekCoding
*/
interface RejectMessageExceptionInterface extends \Throwable
{
}

This comment has been minimized.

Copy link
@ragboyjr

ragboyjr Oct 3, 2018

Contributor

Hmm, @sroze @soyuka Do you think we could make these a common exception instead of per transport?

This comment has been minimized.

Copy link
@soyuka

soyuka Oct 8, 2018

Author Contributor

Would make sense, especially if we want to switch from one transport to another without changing the code.

}
}
}
}

This comment has been minimized.

Copy link
@ragboyjr

ragboyjr Oct 3, 2018

Contributor

@sroze In #28547 when I mention that the logic is shared across different types of receivers, this is what I meant.

This comment has been minimized.

Copy link
@soyuka

soyuka Oct 8, 2018

Author Contributor

Taking this path, it'd actually make sense to me to add a ConnectionInterface but things may be really different from one transport to another.

throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}
$queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : 'messages';

This comment has been minimized.

Copy link
@ragboyjr

ragboyjr Oct 3, 2018

Contributor

Curious if it'd be better to put the queue apart of the DSN vs the options? Personally I prefer to use one DSN configured in the .env, and then re use it across multiple transports while changing the queue parameter in options.

Maybe we could support both? allow queue in the query param, but allow it to be overridden via the options?

@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Oct 3, 2018

@soyuka Extension looks great! Thanks for the hard work!

@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Oct 3, 2018

@soyuka in regards to Requeing the message by default and Rejecting if it implements a RejectMessageExceptionInterface

I think it would make more sense to default to Reject, and requeue if they implement a RequeueMessageExceptionInterface; however, that's my opinion for reasons as stated previously in this thread. If we end up sticking with defaulting to Requeue, do you think it would make sense to add a middleware/option in the messenger config to basically wrap all exceptions into a RejectMessageExceptionInterface?

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Oct 8, 2018

do you think it would make sense to add a middleware/option in the messenger config to basically wrap all exceptions into a RejectMessageExceptionInterface?

Looks a bit too magic to me. I definitely get your concerns about requeuing though, and I'd be in favor of having a RequeueMessageExceptionInterface, with the default behavior set to rejecting messages that fail.

@soyuka soyuka force-pushed the soyuka:redis-messenger branch from a7bfa4b to c4180fb Oct 8, 2018
@soyuka soyuka referenced this pull request Nov 6, 2018
@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Dec 9, 2018

@soyuka with the release of redis streams in redis 5.0, i'm wondering if we should target that data structure for a more robust implementation. We also could possibly implement two different redis connections one for streams and one with lists.

@weaverryan weaverryan referenced this pull request Mar 12, 2019
30 of 36 tasks complete
@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Mar 23, 2019

Ping @soyuka! If you're still willing to work in this, we just merged #30557, which clarifies what a transport needs to do / behave (should simplify the receiver a bit). I would love to see this PR updated!

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Mar 25, 2019

Will definitely work on this thanks for the heads up, any idea if the related documentation is going to be updated?

/edit: I've looked at the PR, messenger is improving that's nice! I've an issue with the new interface though, because my Connection doesn't uses a Enveloppe for now (in receive I'm working on a message, not an Envelope).

@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Mar 25, 2019

@soyuka Awesome!

I've looked at the PR, messenger is improving that's nice! I've an issue with the new interface though, because my Connection doesn't uses a Enveloppe for now (in receive I'm working on a message, not an Envelope).

Can you tell me more about this? Or you can ping me on Slack. Do you mean the methods like ReceiverInterface::ack(Envelope $envelope)? If that's true, it shouldn't be a problem because your receiver is not responsible for calling this anymore - the Worker will call this and it will pass you the Envelope. If you need some sort of "identifier" so you know which message to ack (you will), you should add a stamp to the Envelope before passing it to $handler. Check out the AmqpReceiver. We wrap in a custom stamp (https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php#L73) then use that in ack() and reject() to know which message was received, so we can ack/reject it.

@sroze

This comment has been minimized.

Copy link
Member

sroze commented Mar 27, 2019

groups are awesome if you have multiple systems receiving the same messages

I have no idea what these streams are all about, what is the awesomeness of them in this use-case? When you say that "most things is handled by redis itself" that sounds appealing... but what do you mean exactly? What are the use-cases/logic we wouldn't have to implement anymore?

the only con is that it only works with redis 5 and ext-redis 4.2.

ext-redis 4.2 has been released on the 2018-11-20 so I don't see this as an issue. The 4.3 has also been released and is considered as stable. Though, do you know what's the Redis 5 availability in most of the cloud providers?

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Mar 27, 2019

I don't think that streams are complex in case of the php implementation

I just looked at the redis introduction and it seems to have more features compared to using a normal list. This was the complexity I was talking about, not the one from the php implementation.

But I'm nobody so I can't do a decision here, just wanted to mention it here, thought the implemtation could maybe help.

Everyone's thought is appreciated! I wasn't even aware that redis had implemented this feature, so thanks for the discovery and I'll definitely take a look at your implementation!

@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Mar 27, 2019

My only concern with using streams (other than the additional complexity), is that forcing the use of the ext-redis extension can be an issue under heavier workloads.

We were getting intermittent errors and failures on redis consumer with sf messenger and after some investigation, it looks like they are just related to the redis-ext and those errors don't occur on predis.

phpredis/phpredis#831
nrk/predis#524 (comment)

After we switched, those intermittent errors all went away..., However, predis doesn't seem actively maintained, It's a good library and seems to be more reliable than phpredis, but nrk/predis hasn't touched that repo in forever. Makes me think that a reliable fork or a new maintainer might be necessary for that library.

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Mar 27, 2019

@ragboyjr if we want this to get merged into symfony we have to use the official ext-redis, we can't introduce a dependency to a third party because of maintenance issues.

@alexander-schranz

This comment has been minimized.

Copy link
Contributor

alexander-schranz commented Mar 27, 2019

I have no idea what these streams are all about, what is the awesomeness of them in this use-case? When you say that "most things is handled by redis itself" that sounds appealing... but what do you mean exactly? What are the use-cases/logic we wouldn't have to implement anymore?

Lets say you have 3 Systems. System A is sending messages and System B and C want to receive this messages. When you want that B and C receive both all messages without redis streams you need to implement that logic yourself or sending them into 2 lists or something (correct me if I'm wrong). With redis streams you can just create 2 groups. one for system b and one for system c and redis will give you the messages which you did not receive yet.

I personally found it really simple adding a new messages to a stream can be done with:

$this->redis->xAdd('STREAMNAME', '*', ['content' => json_encode($encodedMessage)]);

Reading the message can be done with:

$messages = $this->redis->xReadGroup('GROUPB', 'CONSUMER1', [$this->stream => 0], 1, 45);

Redis automatically sets the read messages into a pending state for this group so if you example have for the same group multiple consumer they will not get the same message again.

And after you did successfully process the message you mark it in your group as acked:

$this->redis->xAck('STREAMNAME', 'GROUPB', [$messageId]);

So if you later need a System D also receiving all messages you just create in redis a GroupD and so system D is also starting to receive all messages.

So streams will shine if you have multiple systems receiving messages. If you just have 2 Systems a Sender and a Receiver there will be no differents. But if you have multiple receivers Redis will handle you which group/system did receive a message and which one doesn't.

The following is also a interesting blog post about streams: https://brandur.org/redis-streams.

it seems to have more features compared to using a normal list. This was the complexity I was talking about, not the one from the php implementation.

I think as the streams feature is more build for this usecase less complex because of its features and automatic mechanisms.

But maybe I have to less knowledge about the current implementation and maybe it makes sense to have 2 redis transports one using the redis list's and one using redis streams.

@sroze

This comment has been minimized.

Copy link
Member

sroze commented Mar 28, 2019

Indeed, if there is this logic of parking the messages until they are acknowledged, this sounds very interesting because the less we have to deal with these distributed system issues, the better 🙃

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Mar 28, 2019

@alexander-schranz many thanks about this detailed explanation, it helped me to understand better why redis streams may be good for this use case! I'll definitely look into that and use them in this transport implementation!

@ragboyjr

This comment has been minimized.

Copy link
Contributor

ragboyjr commented Mar 28, 2019

@soyuka I think the SF Lock and SF Cache component are able to utilize both redis libraries, i'd imagine if we did a redis transport using the list strategy, we should be able to provide some sort of compat layer to using either of the redis libraries.

I'd be willing to help if needed, but as someone who has used a redis transport with messenger at a high volume on both ext-redis and predis, I wouldn't feel comfortable only allowing ext-redis.

@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Apr 3, 2019

Ping! Do we have some direction/motivation/time to finish this? We're after feature freeze, though there is some wiggle room probably because this component is experimental.

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Apr 3, 2019

Ping! Do we have some direction/motivation/time to finish this? We're after feature freeze, though there is some wiggle room probably because this component is experimental.

To me, time is an issue, maybe this week-end but definitely not sooner. Also, the streams must be tested they may improve the transport a lot, which means more work.

@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Apr 4, 2019

To me, time is an issue

I hear that :). Good candidate maybe for the hackathon this weekend, if not by you - by someone else? We could check to see if anyone has the expertise & is interested.

Cheers!

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Apr 4, 2019

I'd love to work on this this week end but as I'm the Api Platform referent I'm not sure that I'll be able too :p.

@alexander-schranz

This comment has been minimized.

Copy link
Contributor

alexander-schranz commented Apr 4, 2019

@weaverryan @soyuka I will have a look at the current state at the EU-FOSSA Hackathon and will then refractor it to use the redis stream functions.

@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Apr 4, 2019

Added the hackathon star - thank you @alexander-schranz!

@alexander-schranz

This comment has been minimized.

Copy link
Contributor

alexander-schranz commented Apr 6, 2019

Analysed the current state and after also having a look at the new ReceiverInterface and playing around about it my concept will be the following:

+-----------R
|    GET    | -> XREADGROUP
+-----------+
      |
      | handleMessage
      V
+-----------+  No
|  failed?  |---------------------------+
+-----------+                           |
      |                                 |
      | Yes                             |
      V                                 |
+-----------+  No                       |
|   retry?  |---------------------------+
+-----------+                           |
      |                                 |
      | Yes                             |
      V                                 V
+-----------R                    +-----------R
|   REJECT  | -> XACK            |    ACK    | -> XACK
+-----------+                    +-----------+

GET: Will use XREADGROUP to read the one message from the stream
REJECT: Reject will just remove the message with XACK from the stream as adding it back to the stream is handled by symfony worker itself
ACK: Will use the XACK Method to ack the message for the specific group

The sender will still be simple by calling the XADD redis function.

#EU-FOSSA

@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Apr 6, 2019

I don't know about the Redis streams part, but I can confirm the rest of the diagram is perfect: you're acking/nacking with the correct logic.

@sroze

This comment has been minimized.

Copy link
Member

sroze commented Apr 6, 2019

@alexander-schranz great. Fancy updating this pull-request (or creating another one)?

@alexander-schranz

This comment has been minimized.

Copy link
Contributor

alexander-schranz commented Apr 6, 2019

@sroze If I can I will update this PR so we have all discussion about it here.

Copy link
Member

Nyholm left a comment

Cool. I like this. I will test this later.

return null;
}
$key = md5($value['body']);

This comment has been minimized.

Copy link
@Nyholm

Nyholm Apr 6, 2019

Member

sha1 is quicker. I think we should use that one instead.

*/
public function ack($message)
{
$key = md5($message['body']);

This comment has been minimized.

Copy link
@Nyholm

Nyholm Apr 6, 2019

Member

sha1

/**
* Reject the message: we acknowledge it, means we remove it form the queues.
*
* @TODO: log something?

This comment has been minimized.

Copy link
@Nyholm

Nyholm Apr 6, 2019

Member

This should be removed or addressed.

*/
public function requeue($message)
{
$key = md5($message['body']);

This comment has been minimized.

Copy link
@Nyholm

Nyholm Apr 6, 2019

Member

sha1

}
/**
* Add item at the tail of list.

This comment has been minimized.

Copy link
@Nyholm

Nyholm Apr 6, 2019

Member

This comment is wrong. We are doing a queue and not a stack. =)

We are adding items to the head of the list.

Using lpush is correct.

$pending = $this->connection->lRange($processingQueue, 0, -1);
foreach ($pending as $temp) {
$key = md5($temp['body']);

This comment has been minimized.

Copy link
@Nyholm

Nyholm Apr 6, 2019

Member

sha1

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Apr 6, 2019

Please feel free to update it!

@alexander-schranz

This comment has been minimized.

Copy link
Contributor

alexander-schranz commented Apr 6, 2019

I'm not able to push into @soyuka's fork so I created a new PR #30917
/cc @Nyholm @sroze

@soyuka

This comment has been minimized.

Copy link
Contributor Author

soyuka commented Apr 6, 2019

Closing this one then!

@soyuka soyuka closed this Apr 6, 2019
fabpot added a commit that referenced this pull request Apr 27, 2019
…ander-schranz)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Add a redis stream transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | Yes
| Fixed tickets | #28681
| License       | MIT
| Doc PR        | symfony/symfony-docs#11341

As discussed in #28681 this will refractor @soyuka implementation of redis using the redis stream features so we don't need to handle parking the messages ourself and redis is doing it for us.

Some interesting links about streams:

 - https://redis.io/topics/streams-intro
 - https://brandur.org/redis-streams

```
+-----------R
|    GET    | -> XREADGROUP
+-----------+
      |
      | handleMessage
      V
+-----------+  No
|  failed?  |---------------------------+
+-----------+                           |
      |                                 |
      | Yes                             |
      V                                 |
+-----------+  No                       |
|   retry?  |---------------------------+
+-----------+                           |
      |                                 |
      | Yes                             |
      V                                 V
+-----------R                     +-----------R
|   REJECT  | -> XDEL             |    ACK    | -> XACK
+-----------+                     +-----------+
```

**GET**: Will use `XREADGROUP` to read the one  message from the stream
**REJECT**: Reject will just remove the message with `XDEL` from the stream as adding it back to the stream is handled by symfony worker itself
**ACK**: Will use the `XACK` Method to ack the message for the specific group

The sender will still be simple by calling the `XADD` redis function.

#EU-FOSSA

Commits
-------

ff0b855 Refractor redis transport using redis streams
7162d2e Implement redis transport
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants
You can’t perform that action at this time.