Skip to content

Commit

Permalink
Merge pull request #17 from python-hyper/perf
Browse files Browse the repository at this point in the history
Use heapq, not queue.PriorityQueue
  • Loading branch information
Lukasa committed May 29, 2016
2 parents 79d421f + 3f0692e commit 09acfe5
Showing 1 changed file with 12 additions and 16 deletions.
28 changes: 12 additions & 16 deletions src/priority/priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@
"""
from __future__ import division

import heapq
import sys

try:
import Queue as queue
except ImportError: # Python 3:
import queue


PY3 = sys.version_info[0] == 3

Expand Down Expand Up @@ -59,7 +55,7 @@ def __init__(self, stream_id, weight=16):
self.weight = weight
self.children = []
self.parent = None
self.child_queue = queue.PriorityQueue()
self.child_queue = []
self.active = True
self.last_weight = 0
self._deficit = 0
Expand All @@ -72,7 +68,7 @@ def add_child(self, child):
"""
child.parent = self
self.children.append(child)
self.child_queue.put((self.last_weight, child))
heapq.heappush(self.child_queue, (self.last_weight, child))

def add_child_exclusive(self, child):
"""
Expand All @@ -82,7 +78,7 @@ def add_child_exclusive(self, child):
"""
old_children = self.children
self.children = []
self.child_queue = queue.PriorityQueue()
self.child_queue = []
self.last_weight = 0
self.add_child(child)

Expand All @@ -105,14 +101,14 @@ def remove_child(self, child, strip_children=True):
# it in the old one
self.children.remove(child)

new_queue = queue.PriorityQueue()
new_queue = []

while not self.child_queue.empty():
level, stream = self.child_queue.get()
while self.child_queue:
level, stream = heapq.heappop(self.child_queue)
if stream == child:
continue

new_queue.put((level, stream))
heapq.heappush(new_queue, (level, stream))

self.child_queue = new_queue

Expand All @@ -137,7 +133,7 @@ def schedule(self):
try:
while next_stream is None:
# If the queue is empty, immediately fail.
val = self.child_queue.get(block=False)
val = heapq.heappop(self.child_queue)
popped_streams.append(val)
level, child = val

Expand All @@ -148,14 +144,14 @@ def schedule(self):
# suitable children.
try:
next_stream = child.schedule()
except queue.Empty:
except IndexError:
continue
finally:
for level, child in popped_streams:
self.last_weight = level
level += (256 + child._deficit) // child.weight
child._deficit = (256 + child._deficit) % child.weight
self.child_queue.put((level, child))
heapq.heappush(self.child_queue, (level, child))

return next_stream

Expand Down Expand Up @@ -369,7 +365,7 @@ def __iter__(self): # pragma: no cover
def __next__(self): # pragma: no cover
try:
return self._root_stream.schedule()
except queue.Empty:
except IndexError:
raise DeadlockError("No unblocked streams to schedule.")

def next(self): # pragma: no cover
Expand Down

0 comments on commit 09acfe5

Please sign in to comment.