Skip to content

Updated version with safer heap handling #4

@Voyz

Description

@Voyz

Takes to account better heap handling from: https://docs.python.org/3/library/heapq.html

Addresses the following issues:

A priority queue is common use for a heap, and it presents several implementation challenges:

  • Sort stability: how do you get two tasks with equal priorities to be returned in the order they were originally added?
  • Tuple comparison breaks for (priority, task) pairs if the priorities are equal and the tasks do not have a default comparison order.
  • If the priority of a task changes, how do you move it to a new position in the heap?
  • Or if a pending task needs to be deleted, how do you find it and remove it from the queue?
class PriorityQueue(queue.Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''
    REMOVED = '<removed-task>'
    DEFAULT_PRIORITY = 100

    def _init(self, maxsize):
        self.queue = []
        self.entry_finder = {}
        self.counter = itertools.count()

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        # heappush(self.queue, item)
        try:
            if item[1] in self.entry_finder:
                self.remove(item[1])
            count = next(self.counter)
            entry = [item[0], count, item[1]]
            self.entry_finder[item[1]] = entry
            heappush(self.queue, entry)
        except TypeError: # handle item==None
            self._put((self.DEFAULT_PRIORITY, None))

    def remove(self, task):
        """
        This simply replaces the data with the REMOVED value,
        which will get cleared out once _get reaches it.
        """
        entry = self.entry_finder.pop(task)
        entry[-1] = self.REMOVED

    def _get(self):
        while self.queue:
            entry = heappop(self.queue)
            if entry[2] is not self.REMOVED:
                del self.entry_finder[entry[2]]
                return entry
        return None

def _worker(executor_reference, work_queue, initializer, initargs):
    if initializer is not None:
        try:
            initializer(*initargs)
        except BaseException:
            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
            executor = executor_reference()
            if executor is not None:
                executor._initializer_failed()
            return
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item[2] is not None:
                work_item[2].run()
                # Delete references to object. See issue16284
                del work_item

                # attempt to increment idle count
                executor = executor_reference()
                if executor is not None:
                    executor._idle_semaphore.release()
                del executor
                continue

            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Flag the executor as shutting down as early as possible if it
                # is not gc-ed yet.
                if executor is not None:
                    executor._shutdown = True
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)

class PriorityThreadPoolExecutor(ThreadPoolExecutor):
    """
    Thread pool executor with priority queue (priorities must be different, lowest first)
    """
    def __init__(self, *args, **kwargs):
        super(PriorityThreadPoolExecutor, self).__init__(*args, **kwargs)

        # change work queue type to queue.PriorityQueue
        self._work_queue:PriorityQueue = PriorityQueue()

    def submit(self, fn, *args, **kwargs):
        """

        Sending the function to the execution queue

        :param fn: function being executed
        :type fn: callable
        :param args: function's positional arguments
        :param kwargs: function's keywords arguments
        :return: future instance
        :rtype: _base.Future

        Added keyword:

        - priority (integer later sys.maxsize)

        """
        with self._shutdown_lock:
            if self._broken:
                raise BrokenThreadPool(self._broken)

            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')
            if _shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')

            priority = kwargs.get('priority', random.randint(0, sys.maxsize-1))
            if 'priority' in kwargs:
                del kwargs['priority']

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put((priority, w))
            self._adjust_thread_count()
            return f

    def _adjust_thread_count(self):
        # if idle threads are available, don't spin new threads
        if self._idle_semaphore.acquire(timeout=0):
            return

        # When the executor gets lost, the weakref callback will wake up
        # the worker threads.
        def weakref_cb(_, q=self._work_queue):
            q.put(None)

        num_threads = len(self._threads)
        if num_threads < self._max_workers:
            thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                     num_threads)
            t = threading.Thread(name=thread_name, target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue,
                                       self._initializer,
                                       self._initargs))
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions