Skip to content

Commit

Permalink
update kill cursors in relation to IntegrityError
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Nov 9, 2011
1 parent 490343c commit fb14848
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
2 changes: 1 addition & 1 deletion asyncmongo/connection.py
Expand Up @@ -177,7 +177,7 @@ def _parse_response(self, response):

if response and response['data'] and response['data'][0].get('err') and response['data'][0].get('code'):
# logging.error(response['data'][0]['err'])
callback(None, IntegrityError(response['data'][0]['err'], code=response['data'][0]['code']))
callback(response, IntegrityError(response['data'][0]['err'], code=response['data'][0]['code']))
return
# logging.info('response: %s' % response)
callback(response)
Expand Down
22 changes: 11 additions & 11 deletions asyncmongo/cursor.py
Expand Up @@ -388,6 +388,17 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
raise

def _handle_response(self, result, error=None, orig_callback=None):
if result and result.get('cursor_id'):
connection = self.__pool.connection()
try:
connection.send_message(
message.kill_cursors([result['cursor_id']]),
callback=None)
except Exception, e:
logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e))
connection.close()
raise

if error:
logging.error('%s %s' % (self.full_collection_name , error))
orig_callback(None, error=error)
Expand All @@ -398,17 +409,6 @@ def _handle_response(self, result, error=None, orig_callback=None):
else:
orig_callback(result['data'], error=None)

if result.get('cursor_id'):
# logging.debug('killing cursor %s', result['cursor_id'])
connection = self.__pool.connection()
try:
connection.send_message(
message.kill_cursors([result['cursor_id']]),
callback=None)
except Exception, e:
logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e))
connection.close()
raise

def __query_options(self):
"""Get the query options string to use for this query."""
Expand Down
15 changes: 12 additions & 3 deletions test/test_query.py
Expand Up @@ -28,13 +28,22 @@ def test_query(self):
db = asyncmongo.Client(pool_id='test_query', host='127.0.0.1', port=int(self.mongod_options[0][1]), dbname='test', mincached=3)

def noop_callback(response, error):
tornado.ioloop.IOLoop.instance().stop()
logging.info(response)
loop = tornado.ioloop.IOLoop.instance()
# delay the stop so kill cursor has time on the ioloop to get pushed through to mongo
loop.add_timeout(time.time() + .1, loop.stop)

before = self.get_open_cursors()
db.foo.find(limit=20, callback=noop_callback)

# run 2 queries
db.foo.find({}, callback=noop_callback)
tornado.ioloop.IOLoop.instance().start()
db.foo.find({}, callback=noop_callback)
tornado.ioloop.IOLoop.instance().start()

# check cursors
after = self.get_open_cursors()
self.assertEquals(before, after, "%d cursors left open (should be 0)" % (after - before))
assert before == after, "%d cursors left open (should be 0)" % (after - before)

if __name__ == '__main__':
import unittest
Expand Down

0 comments on commit fb14848

Please sign in to comment.