Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket timeout on PubSub.listen() #225

Closed
za-creature opened this issue Feb 20, 2012 · 12 comments
Closed

Socket timeout on PubSub.listen() #225

za-creature opened this issue Feb 20, 2012 · 12 comments

Comments

@za-creature
Copy link

The following code will time out and raise a ConnectionError if a message is not published to any listened channel within 5 seconds. This doesn't really make any sense because the connection will be dropped and invalidated when this happens.

client = redis.StrictRedis(socket_timeout=5)
channel = client.pubsub()
channel.subscribe("test")
for message in channel.listen():
print message

Catching the ConnectionError in in the middle loop and calling listen() again will throw the following error (my guess is that _fp is deleted by the internal ConnectionError exception handler)

AttributeError: 'NoneType' object has no attribute 'readline'
Traceback (most recent call last):
File "./sub.py", line 11, in
for message in channel.listen():
File "/usr/lib/python2.7/site-packages/redis-2.4.11-py2.7.egg/redis/client.py", line 1306, in listen
r = self.parse_response()
File "/usr/lib/python2.7/site-packages/redis-2.4.11-py2.7.egg/redis/client.py", line 1250, in parse_response
response = self.connection.read_response()
File "/usr/lib/python2.7/site-packages/redis-2.4.11-py2.7.egg/redis/connection.py", line 263, in read_response
response = self._parser.read_response()
File "/usr/lib/python2.7/site-packages/redis-2.4.11-py2.7.egg/redis/connection.py", line 77, in read_response
response = self.read()
File "/usr/lib/python2.7/site-packages/redis-2.4.11-py2.7.egg/redis/connection.py", line 71, in read
return self._fp.readline()[:-2]

If it were me, I'd implement the listen() command with an optional timeout parameter, and throw a special TimeoutException if no message was received in THAT timeout, but leave the connection as is. Imho, the socket_timeout parameter should only be used for detecting network issues. Or at least, send a ping() command and only throw the ConnectionError if the ping times out. The only workaround I have found so far involves generating a new pubsub object after each timeout:

client = redis.StrictRedis(socket_timeout=5)
while True:
try:
channel = client.pubsub()
channel.subscribe("test")
for message in channel.listen():
print repr(message)
except KeyboardInterrupt:
raise#want to shutdown on ctrl+c
except:
logging.exception("")

The problem with this is that between the timeout exceptioncalling listen on the new pubsub object (which I guess isn't all that long but it still counts), new messages will not be delivered. Right now, this issue limits Py-Redis' usefulness in a enterprise environment (I'm designing a distributed system that uses a redis database, and I wanted to use redis for messaging as well, but I guess I'm forced to revert to AMQP)

Also, I believe that the documentation for the PubSub class should be updated. It is not immediately clear to me that PubSub.listen() is a generator from the following description: After subscribing to one or more channels, the listen() method will block until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again.

@andymccurdy
Copy link
Contributor

The completely simplistic answer is to specify socket_timeout as None, and you'll never get a timeout. This has other side effects, but might solve your immediate need.

Sending and receiving PING commands could potentially work, but considerations would have to be made for out-of-order replies (i.e., sending a ping, then receiving a message that was published before the pong came back).

While listen() could probably be implemented with a timeout, a fair bit of the Connection object would need to be refactored. As soon as you start working with sockets that either have timeouts or aren't blocking, you can't use socket.makefile anymore.

I'll put this on my to-do list, but a pull request will probably get pushed faster.

@kiddouk
Copy link

kiddouk commented Mar 6, 2012

I can see also a great use of the Timeout in the Listen (my case: a threaded listen that is blocking prevent me to shutdown threads properly when no message is sent).

@za-creature have you been looking into creating a patch for that or should I start from scratch ?

@za-creature
Copy link
Author

@kiddouk

I can't really find the time to do it; I'm afraid you're on your own.
As for your problem, you might be able to get away with running just a single consumer (redis' psubscribe may be able to help you to listen on multiple queues at the same time) in the main thread, and shut that down from any thread by sending the process signal 2. This will raise a KeyboardInterrupt in the main thread that is leaked out of pyredis (because it does not derive from the Exception class)

@kiddouk
Copy link

kiddouk commented Mar 7, 2012

@za-creature Yes and no. Since My listener is in a separate thread. Sending Signal 2 would kill my entire process. And I don't want that. I want my main process to manage my threads correctly.

But that discussion is out of scope of redis-py issues.

@za-creature
Copy link
Author

In the end, I did write a patch to fix this (I was going to use QPID, but it had a very complicated acknowledgement procedure, and I really didn't want the overhead). However it does involve a fairly nasty workaround for redis not accepting ping() commands in the subscribed state (see redis/redis#420 ), and redis-py's design of closing down connections whenever a redis syntax error is encountered.

I do not have a complete understanding of neither redis nor redis-py, but from what I could determine, the issue of out of band messages already existed:

consumer subscribes to "queue1"
producer publishes a message to "queue1"
consumer subscribes to "queue2", but instead of the subscribe-ok response, receives the new message on "queue1"

So I added an out of band message queue in PubSub that will store unwanted messages until they are ready to be consumed. I designed this from the asumption that PubSub will never return to the caller until listen() can be safely called, so no redis responses can ever happen inside a listen() call. I base this assumption on the fact that execute_command seems to block until an answer is received, but that answer MAY be a message.

I've never used git before, so terms like push, pull and whatnot are unknown to me. Here are instead two patch files, for connection.py and client.py that seem to solve this issue.
I say seem because it's a fairly difficult to reproduce situation, given the fast nature of redis. I have only successfully tested that messages are received after the socket timeout.

And, here are the patches for 2.4.11:

connection.py: https://gist.github.com/2253482
client.py: https://gist.github.com/2253495

@za-creature
Copy link
Author

bump

Any chance of getting the patches applied?

@node
Copy link

node commented Jul 10, 2012

@za-creature Could I know if you has requested a pull for this issue ? Or @andymccurdy has fixed this issue on the new version ?

I use redis-py in my distribution project for pulling messages from server to many clients , and find the listener become deaf in several days . I checked the server and clients, and found the TCP port has changed, so here really needs ping-pong or timeout mechanism .

@andymccurdy
Copy link
Contributor

Personally I like the idea mentioned in redis/redis#420 about re-subscribing to an already subscribed channel instead of doing craft things with PING/PONG. We already know what channels the user is subscribed to, so this should be possible and a much easier patch than what @za-creature proposed above.

@za-creature do you agree?

@node
Copy link

node commented Jul 10, 2012

@andymccurdy Got it. re-sub and sub without channel name is the better way to all of us.

@za-creature
Copy link
Author

@node No, I did not do a pull request as I am git-illiterate; I made two patches and posted links in my previous reply; you can apply them locally if you want, but I only did some preliminary testing on them; your mileage may vary!

@andymccurdy Yes, that seems to be the only way to go currently since the other solutions involve a bit of guesswork as to in which direction redis' development is going. However I'd like to point out that the issue of OOB messages may still exist; please contradict me if otherwise:

consumer subscribes to "queue1"
producer publishes a message to "queue1"
consumer subscribes to "queue2", but instead of the subscribe-ok response, receives the new message on "queue1"

This can also happen when re-subscribing to determine service availability since the mechanism is similar (actually, depending on the number of channels subscribed to, the probability of it happening may be greater)

@andymccurdy
Copy link
Contributor

Hi. Many apologies on taking so long to cleanup Publish/Subscribe in redis-py. I've recently had some time to make some big improvements and I'd love to get your feedback.

There is now a pubsub.get_message() method that is a non-blocking read. If no message is available to be read, pubsub.get_message() returns None.

You can review the changes in the pubsub branch here: https://github.com/andymccurdy/redis-py/tree/pubsub

I've written a section in the README docs about PubSub that covers most (hopefully all!) use cases. You can find it here: https://github.com/andymccurdy/redis-py/blob/pubsub/README.rst#publish--subscribe

My current plan is to merge the pubsub branch into master and roll a new release of redis-py by the end of the week. Again, I'd love any feedback.

@andymccurdy
Copy link
Contributor

redis-py 2.10 is now out with the new PubSub system.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants