Skip to content

Commit

Permalink
change timeouts structure to list to maintain scheduling order
Browse files Browse the repository at this point in the history
  • Loading branch information
abranches committed Mar 31, 2014
1 parent 43904ff commit 10490a6
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions pika/adapters/select_connection.py
Expand Up @@ -203,7 +203,7 @@ def __init__(self, fileno, handler, events, state_manager):
self.events = events
self.open = True
self._handler = handler
self._timeouts = dict()
self._timeouts = []
self._manage_event_state = state_manager

def add_timeout(self, deadline, callback_method):
Expand All @@ -220,7 +220,7 @@ def add_timeout(self, deadline, callback_method):
value = {'deadline': time.time() + deadline,
'callback': callback_method}
timeout_id = hash(frozenset(value.items()))
self._timeouts[timeout_id] = value
self._timeouts.append((timeout_id, value))
return timeout_id

def flush_pending_timeouts(self):
Expand Down Expand Up @@ -271,22 +271,29 @@ def poll(self, write_only=False):
def process_timeouts(self):
"""Process the self._timeouts event stack"""
start_time = time.time()
for timeout_id in self._timeouts.keys():
if timeout_id not in self._timeouts:
continue
if self._timeouts[timeout_id]['deadline'] <= start_time:
callback = self._timeouts[timeout_id]['callback']
del self._timeouts[timeout_id]
# while loop instead of a more straightforward for loop so we can
# delete items from the list while iterating
i = 0
while i < len(self._timeouts):
t_id, timeout = self._timeouts[i]
if timeout['deadline'] <= start_time:
callback = timeout['callback']
del self._timeouts[i]
callback()
else:
i += 1

def remove_timeout(self, timeout_id):
"""Remove a timeout if it's still in the timeout stack
:param str timeout_id: The timeout id to remove
"""
if timeout_id in self._timeouts:
del self._timeouts[timeout_id]
for i in xrange(len(self._timeouts)):
t_id, timeout = self._timeouts[i]
if t_id == timeout_id:
del self._timeouts[i]
break

def start(self):
"""Start the main poller loop. It will loop here until self.closed"""
Expand Down

0 comments on commit 10490a6

Please sign in to comment.