Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for pubsub #13

Closed
thijsterlouw opened this issue Oct 17, 2011 · 17 comments
Closed

Support for pubsub #13

thijsterlouw opened this issue Oct 17, 2011 · 17 comments

Comments

@thijsterlouw
Copy link

I believe eredis does not support Redis' pubsub ( http://redis.io/topics/pubsub )
Is someone already working on this, or is there interest in adding this feature?

@knutin
Copy link
Contributor

knutin commented Oct 17, 2011

Hi Thijs,

You are right, there is no support for pubsub at the moment. I think it is very interesting as it allows building very interesting applications. There are however some important questions around how it should be implemented.

I think the publish command is just a normal command that returns immediately, so this would already be supported.

Subscribing to a channel requires a dedicated connection where you only listen for messages (and subscribe/unsubscribe). You can also subscribe to a pattern. The messages will be pushed to the socket. This is a radical departure from the current architecture of eredis_client.erl, so I think it should be implemented in a separate module. It could reuse the parser and the general idea of an active once socket.

There is also some design questions that needs to be answered:

  • Do you want a 1:1 matching between an Erlang process and a eredis subscribed client where the Erlang process receives all messages? The messages will be tagged with the channel or pattern, how should this be conveyed to the Erlalng process?
  • In the above scenario, how do you handle flow control? Your Erlang process could easily have its message queue overloaded (if you have some sort of computation or external call or whatever, it could very easily happen even at a low number of messages). Should the Erlang process send a response back "acknowledning" that it is now ready to receive more messages (like the gen_tcp active once socket?)
  • How is the flow control implemented in eredis? Does it buffer inside the eredis subscribe client or does it simply not turn the socket back to active once before it has gotten a reply?
  • Should the Erlang process be implemented as a gen_event-like behaviour? gen_redis_sub? Should the behaviour be synchronous (to protect from overload) or asynchronous?
  • Or do you want to provide a callback to the eredis sub client that is executed inside the eredis client with flow control. Eredis uses active once and if you execute a callback before activating the socket you have flow control managed by the kernel.
  • Should there be only one callback or should you be allowed to register a callback to a certain channel/pattern? Should you be allowed to register multiple callbacks to the same pattern?
  • What happens when the callback crashes? What happens if it takes too long?
  • With the callback idea, could this form the base of implementing the other approach mentioned above?

Maybe providing both the "subscribe this Erlang process to messages on this channel/pattern" and "execute this callback for every message in this channel/pattern" makes sense. I think the answer to many of the above questions depends on how you want to use it within your own application so not being to opinionated would maybe appeal to a broader audience.

Knut

@thijsterlouw
Copy link
Author

Hi Knut, thanks for your detailed reply! You raised some good questions.

  • I would prefer a N:M relationship between user-processes and eredis clients. This could be configured as N:1 for increased efficiency at the risk of the 1 eredis client becoming a bottleneck (I don't really think most use-cases will have such high message rates though; so N:1 should be fine for most usecases). If several user processes subscribe to the same channel, the eredis client would just register the subscription internally. It would also link to the user-process.
  • I was thinking of having the user-processes requiring to implement a behavior such as gen_server or as you mentioned perhaps a custom behavior gen_redis_sub. That would make the callbacks very simple to implement, but requires that the client follows a certain style to integrate the event-loop. At first I didn't consider callback, but it is a good solution. We call the callback function with parameters the originating Pid, the Channel name and the result. On top of this, we can callback into gen_server (I guess the most often used)
  • eredis does not need to care too much about control flow: it just receives the subscriptions from Redis and then sends them to all the subscribed user processes. I don't see any problems here.
  • if there are no more subscribers left, eredis client would unsubscribe from redis. A bit like a ref-counted channel :) If a user-process crashes it would be unlinked from the redis client and the ref-count would be one smaller. The eredis client should be (as much as possible) async internally.

@knutin
Copy link
Contributor

knutin commented Oct 17, 2011

Hi Thijs, thanks for good answers. Here are some further thoughts and questions:

  • I agree that one eredis client would very unlikely become the bottleneck. From benchmarking the eredis driver, I found that there was very little difference between having many clients talk to one eredis client and many clients talk to a dedicated eredis client.
  • I think it is very important to provide a "raw", "bare-metal" API that has no message-passing or waiting-for-acknowledgement behaviour. Some users wants to keep things as simple as possible and so exposing an API where a callback is executed for every message received would satisfy this need. I think it makes sense to execute this callback within the eredis client itself, ie. not spawning the callback and then wait for a response. If you really need this close to the metal behaviour you should know what you are doing and if you hold up or crash the driver, then you are responsible yourself.
  • On top of this callback we could write a nice user-friendly "framework", similar to gen_server or gen_event.
  • If we allow the callback to manage a "callback state", ie. every time the callback is invoked it receives it's state and returns a new state that it will receive at the next invocation, it could be used to do statistics calculations, "reduce" jobs, etc.
  • If I understand you correctly, you would want to register the same process to multiple "subscriptions"? I wonder if this is a good idea, as it would mean that while the process is handling a message, another eredis instance might have to wait.
  • With your point about flow control, I have to disagree. If eredis is sending messages coming from a subscription to a process in an asynchronous fire-and-forget way, it might easily overload the receiving process. If the receiver receive messages slower than they are added to the process mailbox, the mailbox will build up and you will eventually run out of memory. If the receiver has a selective receive, ie. matching on a specific message it will have to search an ever increasing queue (this might have changed somewhat lately). Another case is if this process has some other role than just receiving messages from redis that requires it to handle "normal" messages, these might be stuck in the queue until the receiver has time to process them. If this is a gen_server and you are calling it, the call might timeout.
  • Having explicit flow control inside eredis would allow the application to offload the buffering to maybe the most suitable part of the system: the kernel.

I think the first step in implementing something like this is to do the "core" first, ie. a process (a gen_server similar to eredis_client) subscribing to a single topic, executing a callback for every message it receives. On top of this different behaviours could be implemented by providing a callback function.

@thijsterlouw
Copy link
Author

  • I agree about the need for a bare-metal API and that the programmer is responsible for not blocking the driver. That would also be the reason why I thought it made sense to register the same process for multiple subscriptions. Especially if we assume the common case of 1 Eredis client. The programmer should just be warned about not blocking the callbacks (or starting multiple Eredis clients).
  • About the flow-control: I believe you mean that it is better to prevent overload of the user-processes by implementing flow control outside the Erlang messageboxes and moving it to the system (by letting the TCP buffers fill up)? Though this is indeed possible, I am not sure if the overhead and complexity are worth it. Eredis would block while each user process is processing. If we use async-method on the otherhand, the Eredis client is just routing data as fast as possible.
  • Perhaps there is a flexible middle ground?
    • each user process can use either a dedicated Eredis client or use a shared (pool) Eredis client
    • the user can use callbacks (which are executed in the Eredis client process context); If they are blocking, this would automatically result in flow-control in the Eredis client. The user should probably use a dedicated Eredis client in this case.
    • the user can also decide to use non-blocking async notifications (implemented by having a default callback function call gen_server:cast() for example). Disadvantage is the risk of overloading, but advantages are that you can share the connection to Redis and it would be simple to use: simply a handle_cast()-like call.

@knutin
Copy link
Contributor

knutin commented Oct 18, 2011

I think we agree now on a very good solution. We let the eredis client process execute a callback for every message it receives, with this callback (and callback state) it possible to implement the blocking style and the non-blocking style of integration with Erlang processes. This allows users to choose the implementation that best matches their needs.

The challenge I see with this is that you need to provide a way for the callback to handle requests, so you can do stuff like "subscribe this erlang process to this channel". You also need a way for the callback to issue Redis commands, like "subscribe to this new topic requested by the process".

I wonder if this would best be implemented by using a behaviour. So for the "blocking flow-control-inside-eredis"-style, there would be one module implementing this behaviour. For the "cast a message to a process"-style, another module implementing the behaviour.

What do you think?

Another problem is how to send requests to redis from inside the subscribed clients. Redis might be constantly pushing you messages, so there is no way of doing a blocking call to redis from inside the client.

@thijsterlouw
Copy link
Author

This week I will have some time to work on this; I will let you know when I have some progress

@jdavisp3
Copy link
Contributor

jdavisp3 commented Nov 6, 2011

Whoops, I started on this as well, and without reading this thread first :/
My patch is here: #18

I don't think it addresses all the issues raised in this discussion.

It's using messages, so there is not flow control at the moment.
I would describe it as 'bare metal' as it doesn't do any subscription
management at all. It does send messages when the connection goes
down and up so clients can re-subscribe.

I just extended eredis_client for this. Having a separate client seemed
like it would end up with a lot of duplicate code.

If there is interest in my approach, I could convert it to use callbacks
pretty easily.

@jdavisp3
Copy link
Contributor

jdavisp3 commented Nov 6, 2011

I do kind of like the 'active_once' flow control system, though,
which uses messages not callbacks. That would require a 1:1
relationship between subscribers and clients.

@jdavisp3
Copy link
Contributor

jdavisp3 commented Dec 4, 2011

Ok, my pubsub-2 branch has my latest stab at this.
I went with something like the 'controlling process'
paradigm along with the 'active once' mode from the
gen_tcp module. Pubsub messages must be ack'ed
before the next one is sent, otherwise the redis client
will stop setting the tcp connection to active once.

This should handle the flow-control issues, I think.

Thoughts?

@knutin
Copy link
Contributor

knutin commented Dec 7, 2011

Hi,

Thanks for the patch. Nice work!

I took your work on eredis_client.erl and moved it into eredis_sub_client.erl. I ripped out the normal client part of it, so we now have a client that only does subscribing. To make it even more simple, the only time you can specify channels are when starting the client.

The code is in the branch "pubsub2" on woogas eredis repository.

My colleague Paolo noticed that with the "active once" approach, that Redis will buffer the messages if the client is unable to keep up. I have changed this to queue the messages inside eredis and provide a configurable way of dealing with overflow. This should happen only in catastrophic cases, so the two options for now is drop the message queue or shut down the eredis client.

What do you think of this?

@jdavisp3
Copy link
Contributor

jdavisp3 commented Dec 8, 2011

I like this direction! I think this is looking good. In my particular use case,
I need to dynamically adjust the subscriptions over time. I think it would
be pretty easy to support that, do you agree? Maybe a set_channels API?

The client as it stands cannot support the pattern based subscription
commands PSUBSCRIBE/PUNSUBSCRIBE. I've never used them, I
don't know how badly you want to support them.

@knutin
Copy link
Contributor

knutin commented Jan 17, 2012

Sorry for taking a month to reply.

The problem with sending commands to Redis while subscribing to a channel, is that instead of the reply being the next thing coming down the socket, you might get any number of subscription messages which you need to handle before you get to the actual reply of the command. This is problematic for several reasons, one being that the calling process might also be the one to receive the messages, resulting in a dead lock.

Would it be feasible for your use case to start a new eredis driver for each channel?

I guess it would be fairly easy to add support for patterns by duplicating the functionality for the normal subscription.

@jdavisp3
Copy link
Contributor

This is true. It's just that if you happen to have a large number of channels
that you might want to subscribe to, you end up with a large number of
connections, so it's not so scalable in that sense. What if the subscribe/unsubscribe
API were asynchronous and delivered erlang messages to the controlling process?

@knutin
Copy link
Contributor

knutin commented Jan 17, 2012

I think an asynchronous API would be much easier to implement in the driver. Good suggestion! I will look into it.

@knutin
Copy link
Contributor

knutin commented Jan 18, 2012

In 20a72f9 there is an implementation of the asynchronous API. What do you think? Would this provide what you need for your use case?

@jdavisp3
Copy link
Contributor

I like!! I think this would be perfect.

@jdavisp3
Copy link
Contributor

Although I don't personally need it, I think it would be easy
to add support for PSUBSCRIBE and PUNSUBSCRIBE down
the road, just keep the patterns as a separate list to the individual
channels and send all messages to the controlling process without
worrying about pattern matching in erlang.

@knutin knutin closed this as completed Aug 6, 2012
g-andrade added a commit to g-andrade/eredis that referenced this issue Dec 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants