From 10490a646129ccd3440e3c8dcb7be2847984a480 Mon Sep 17 00:00:00 2001 From: Pedro Abranches Date: Mon, 31 Mar 2014 17:59:20 +0100 Subject: [PATCH] change timeouts structure to list to maintain scheduling order --- pika/adapters/select_connection.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index 59e650882..ad6be6761 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -203,7 +203,7 @@ def __init__(self, fileno, handler, events, state_manager): self.events = events self.open = True self._handler = handler - self._timeouts = dict() + self._timeouts = [] self._manage_event_state = state_manager def add_timeout(self, deadline, callback_method): @@ -220,7 +220,7 @@ def add_timeout(self, deadline, callback_method): value = {'deadline': time.time() + deadline, 'callback': callback_method} timeout_id = hash(frozenset(value.items())) - self._timeouts[timeout_id] = value + self._timeouts.append((timeout_id, value)) return timeout_id def flush_pending_timeouts(self): @@ -271,13 +271,17 @@ def poll(self, write_only=False): def process_timeouts(self): """Process the self._timeouts event stack""" start_time = time.time() - for timeout_id in self._timeouts.keys(): - if timeout_id not in self._timeouts: - continue - if self._timeouts[timeout_id]['deadline'] <= start_time: - callback = self._timeouts[timeout_id]['callback'] - del self._timeouts[timeout_id] + # while loop instead of a more straightforward for loop so we can + # delete items from the list while iterating + i = 0 + while i < len(self._timeouts): + t_id, timeout = self._timeouts[i] + if timeout['deadline'] <= start_time: + callback = timeout['callback'] + del self._timeouts[i] callback() + else: + i += 1 def remove_timeout(self, timeout_id): """Remove a timeout if it's still in the timeout stack @@ -285,8 +289,11 @@ def remove_timeout(self, timeout_id): :param str timeout_id: The timeout id to remove """ - if timeout_id in self._timeouts: - del self._timeouts[timeout_id] + for i in xrange(len(self._timeouts)): + t_id, timeout = self._timeouts[i] + if t_id == timeout_id: + del self._timeouts[i] + break def start(self): """Start the main poller loop. It will loop here until self.closed"""