diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index 59e650882..ad6be6761 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -203,7 +203,7 @@ def __init__(self, fileno, handler, events, state_manager): self.events = events self.open = True self._handler = handler - self._timeouts = dict() + self._timeouts = [] self._manage_event_state = state_manager def add_timeout(self, deadline, callback_method): @@ -220,7 +220,7 @@ def add_timeout(self, deadline, callback_method): value = {'deadline': time.time() + deadline, 'callback': callback_method} timeout_id = hash(frozenset(value.items())) - self._timeouts[timeout_id] = value + self._timeouts.append((timeout_id, value)) return timeout_id def flush_pending_timeouts(self): @@ -271,13 +271,17 @@ def poll(self, write_only=False): def process_timeouts(self): """Process the self._timeouts event stack""" start_time = time.time() - for timeout_id in self._timeouts.keys(): - if timeout_id not in self._timeouts: - continue - if self._timeouts[timeout_id]['deadline'] <= start_time: - callback = self._timeouts[timeout_id]['callback'] - del self._timeouts[timeout_id] + # while loop instead of a more straightforward for loop so we can + # delete items from the list while iterating + i = 0 + while i < len(self._timeouts): + t_id, timeout = self._timeouts[i] + if timeout['deadline'] <= start_time: + callback = timeout['callback'] + del self._timeouts[i] callback() + else: + i += 1 def remove_timeout(self, timeout_id): """Remove a timeout if it's still in the timeout stack @@ -285,8 +289,11 @@ def remove_timeout(self, timeout_id): :param str timeout_id: The timeout id to remove """ - if timeout_id in self._timeouts: - del self._timeouts[timeout_id] + for i in xrange(len(self._timeouts)): + t_id, timeout = self._timeouts[i] + if t_id == timeout_id: + del self._timeouts[i] + break def start(self): """Start the main poller loop. It will loop here until self.closed"""