Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CancelledError in _get_answer stops all the magic #37

Closed
tumb1er opened this issue Apr 16, 2014 · 8 comments
Closed

CancelledError in _get_answer stops all the magic #37

tumb1er opened this issue Apr 16, 2014 · 8 comments

Comments

@tumb1er
Copy link
Contributor

tumb1er commented Apr 16, 2014

I use aiohttp + asyncio_redis. When client aborts http request, aiohttp raises CancelledError, and mostly it affects RedisProtocol._get_answer. Next http request just blocks forever. In RedisProtocol instance in self._queue cancelled Futures are accumulated, in my case one Future<CANCELLED> per request.
Haven't written test for it yet, but is it always correct to pop one cancelled future from _queue on CancelledError?

def _get_answer(...):
    try:
         result = yield from answer_f
    except CancelledError:
         self._pop_cancelled_future()
         raise
    ...
@jonathanslenders
Copy link
Owner

I think this would indeed be the solution. I will do some testing to investigate this. Thanks for pointing out the problem!

Cancellation is hard to get correct, and even in asyncio itself they are now fixing a few CancelledErrors there were not handled before.

@tumb1er
Copy link
Contributor Author

tumb1er commented Apr 16, 2014

After some tests I figured more correct solution:

    def _push_answer(self, answer):
        """
        Answer future at the queue.
        """
        f = self._queue.popleft()

        if isinstance(answer, Exception):
            f.set_exception(answer)
        elif f.cancelled(): # Aha! Here InvalidStateError was not handled
            pass
        else:
            f.set_result(answer)

If future is cancelled, RedisProtocol._handle_bulk_reply fails with InvalidStateError in the middle of reading of binary answer, so the entire connection become broken. So, don't set_result on cancelled Future.

@jonathanslenders
Copy link
Owner

That's indeed better, because this way, the queue clearly indicates how much answers Redis is still going to send.

I made a pull request and a unit test:
#38

@jonathanslenders
Copy link
Owner

Of course, if someone calls a blocking pop without a timeout on the protocol, and then cancels the coroutine, the blocking pop will still be on the wire and could wait there forever if nothing gets ever pushed.

@tumb1er
Copy link
Contributor Author

tumb1er commented Apr 16, 2014

What about zrange reply? I mean stream-like reply processing:

result = yield from redis.zrange(...)
for f in result:
    key, value = yield from result
    # cancellation here

@jonathanslenders
Copy link
Owner

Hmm.... I'm not able to trigger any InvalidStateError exceptions for these kind of replies. I tried this with a SetReply and it just executes.

 @redis_test                                                                                      
 def test_cancellation2(self, transport, protocol):                                               
     """ Test CancelledError: when the stream gets cancelled. """                                 
     yield from protocol.delete(['our_set'])                                                      

     # Create a large set of large keys. (Which would require multiple IP packets.)               
     prefix = 'a-very-long-key-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' * 10                        
     yield from protocol.sadd('our_set', list([prefix + str(i) for i in range(1000 * 100) ]))     

     result = yield from protocol.smembers('our_set')                                             
     print('got result')                                                                          

     # Now get all the futures.                                                                   
     futures = []                                                                                 
     for coroutine in result:                                                                     
         futures.append(asyncio.async(coroutine, loop=self.loop))                                 
     print('len=', len(futures))                                                                  

     # Cancel all futures without waiting on them.
     for f in futures:                                                                            
         f.cancel()                                                                               
     print('cancelled')                                                                           

    # Try another query.  (this could block or raise InvalidStateError)
    result = yield from protocol.echo(u'my string')
    self.assertEqual(result, u'my string')         

@jonathanslenders
Copy link
Owner

It's actually this line where we continue parsing with multibulk_reply.queue.put_nowait as callback instead of protocol._push_answer.
https://github.com/jonathanslenders/asyncio-redis/blob/master/asyncio_redis/protocol.py#L925

I'm not sure that I understand asyncio.queues.Queue correctly, but somehow it looks like it's fine to put an item on a queue when the receiving end was cancelled.

@tumb1er
Copy link
Contributor Author

tumb1er commented Apr 16, 2014

Hope this will help :)

@tumb1er tumb1er closed this as completed Apr 16, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants