Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Add a redis stream transport #30917

Open
wants to merge 2 commits into
base: master
from

Conversation

@alexander-schranz
Copy link
Contributor

alexander-schranz commented Apr 6, 2019

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? Yes
Fixed tickets #28681
License MIT
Doc PR symfony/symfony-docs#11341

As discussed in #28681 this will refractor @soyuka implementation of redis using the redis stream features so we don't need to handle parking the messages ourself and redis is doing it for us.

Some interesting links about streams:

+-----------R
|    GET    | -> XREADGROUP
+-----------+
      |
      | handleMessage
      V
+-----------+  No
|  failed?  |---------------------------+
+-----------+                           |
      |                                 |
      | Yes                             |
      V                                 |
+-----------+  No                       |
|   retry?  |---------------------------+
+-----------+                           |
      |                                 |
      | Yes                             |
      V                                 V
+-----------R                     +-----------R
|   REJECT  | -> XDEL             |    ACK    | -> XACK
+-----------+                     +-----------+

GET: Will use XREADGROUP to read the one message from the stream
REJECT: Reject will just remove the message with XDEL from the stream as adding it back to the stream is handled by symfony worker itself
ACK: Will use the XACK Method to ack the message for the specific group

The sender will still be simple by calling the XADD redis function.

#EU-FOSSA

@curry684

This comment has been minimized.

Copy link
Contributor

curry684 commented Apr 6, 2019

Needs [Messenger] prefix ;)

@lyrixx lyrixx changed the title Add a redis stream transport [Messenger] Add a redis stream transport Apr 6, 2019

);
}
foreach ($messages[$this->stream] as $key => $message) {

This comment has been minimized.

Copy link
@jderusse

jderusse Apr 6, 2019

Contributor

What if you crash (segafault) before ack/nack the message? what happens to the message flagged as "pending" by redis,

This comment has been minimized.

Copy link
@alexander-schranz

alexander-schranz Apr 6, 2019

Author Contributor

Thx for the review added to the checklist. I think we need first receive all pending messages first when a consumer crash or is restarted. Added to my checklist

This comment has been minimized.

Copy link
@alexander-schranz

alexander-schranz Apr 6, 2019

Author Contributor

As I could not found that Redis is adding pending messages later back to the stream after a timeout. I would think we need read first the pending messages again which where not finished and then later the new messages?

This comment has been minimized.

Copy link
@jderusse

jderusse Apr 7, 2019

Contributor

From the blog post: https://redis.io/topics/streams-intro

As you can see the idea here is to start consuming the history, that is, our list of pending messages. This is useful because the consumer may have crashed before, so in the event of a restart we want to read again messages that were delivered to us without getting acknowledged. This way we can process a message multiple times or one time (at least in the case of consumers failures, but there are also the limits of Redis persistence and replication involved, see the specific section about this topic).

You also have XPENDING command for that.

This comment has been minimized.

Copy link
@alexander-schranz

alexander-schranz Apr 7, 2019

Author Contributor

@jderusso thank you. xpending give just a list of pending messages with ids but you can give 0 instead of > to xreadgroup get only the current pending messages which I now use.

Any other ID, that is, 0 or any other valid ID or incomplete ID (just the millisecond time part), will have the effect of returning entries that are pending for the consumer sending the command. So basically if the ID is not >, then the command will just let the client access its pending entries: delivered to it, but not yet acknowledged.

https://redis.io/commands/xreadgroup

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch from 62b159c to e0d5932 Apr 6, 2019

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch 11 times, most recently from 781ae29 to 580fdb9 Apr 6, 2019

@weaverryan weaverryan referenced this pull request Apr 7, 2019

Open

[Messenger] Making it Shine #30540

26 of 36 tasks complete

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch 4 times, most recently from cba457c to 52d3d68 Apr 7, 2019

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch 2 times, most recently from f71fb8b to 9ed5f76 Apr 7, 2019

@alexander-schranz

This comment has been minimized.

Copy link
Contributor Author

alexander-schranz commented Apr 7, 2019

The implementation can now be reviewed again. Pending messages are now handled.

I don't know if there is anything possible about a DelayStamp in redis so this is currently not implemented.

/cc @Nyholm, @sroze

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch from 9ed5f76 to 6a77cb8 Apr 7, 2019

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch from 6a77cb8 to 6eac29b Apr 7, 2019

@nicolas-grekas nicolas-grekas added this to the next milestone Apr 7, 2019

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch 2 times, most recently from 4d0016c to 26d0e24 Apr 10, 2019

@onEXHovia

This comment has been minimized.

Copy link

onEXHovia commented Apr 10, 2019

@alexander-schranz If this is possible in Redis, it is probably worth implementing the interface Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface, as is done for Doctrine/Amqp.

@alexander-schranz

This comment has been minimized.

Copy link
Contributor Author

alexander-schranz commented Apr 10, 2019

@onEXHovia there is no easy count for the messages for a specific receiver but will have a look at it but would see it at current state like the delayedstamp out of scope for this PR.

But what should be the result for the count function?

There are 3 states a message could have before being acked or reject:

  • Messages InDelay ( messages which are delayed )
  • Messages InQueue ( messages which were not yet getted by the receiver)
  • Messages InProcess ( messages which were getted but not yet acked or rejected )

Should the count return all messages which where not yet acked or rejected also the delayed and inqueue messages?

@alexander-schranz

This comment has been minimized.

Copy link
Contributor Author

alexander-schranz commented Apr 10, 2019

I'm not sure why PHP 7.1 and PHP 7.3 (on lowest) is throwing an segmentation fault on travis. Does somebody maybe have an idea?

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch from 123b4dd to 249c77d Apr 11, 2019

@weaverryan

This comment has been minimized.

Copy link
Member

weaverryan commented Apr 11, 2019

Good question - the count would be a sum of the first two: delayed + InQueue. This method is meant to be a possible “approximation”, and is intended for giving the user a general “status” of the size.

@soyuka

This comment has been minimized.

Copy link
Contributor

soyuka commented Apr 11, 2019

I'm not sure why PHP 7.1 and PHP 7.3 (on lowest) is throwing an segmentation fault on travis. Does somebody maybe have an idea?

I'd try to reproduce locally, this would give you the ability to create a core dump and would give more information. I've not enough time currently to work on this :|.

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch 3 times, most recently from d0224f3 to 97af470 Apr 11, 2019

@alexander-schranz

This comment has been minimized.

Copy link
Contributor Author

alexander-schranz commented Apr 12, 2019

Finally green :)

@Simperfit

This comment has been minimized.

Copy link
Contributor

Simperfit commented Apr 15, 2019

Status: Needs Review

@chalasr
Copy link
Member

chalasr left a comment

Thanks for this, I want it for 4.3 :)
The framework integration is missing, we need to register the redis transport factory in FrameworkBundle's messenger.xml (and add a test for it)

@alexander-schranz alexander-schranz force-pushed the alexander-schranz:redis-messenger branch from 97af470 to a4cceee Apr 22, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.