# Week 11: Priority Queue

Develop a class `MyFirstPriorityQueue` that implements a heap-based priority queue using just a list.


In [None]:
class MyFirstPriorityQueue:

    _RESIZE_BELOW: float = 0.50
    _TOO_SMALL = "peek_next from priority queue with less than 2 elements"
    _PEEK_EMPTY = "peek from empty priority queue"
    _EXTRACT_EMPTY = "extract from empty priority queue"
    _EL_SINGULAR = "element"
    _EL_PLURAL = f"{_EL_SINGULAR}s"

    def __init__(self):
        # Underlying storage for the priority queue
        self._underlying = []
        # Count of elements present in the priority queue. We need this
        # because the underlying storage may be larger than the number
        # of elements present.
        self._count = 0

    def __bool__(self) -> bool:
        """Return True if the priority queue has at least one element;
        otherwise, return False."""
        return self._count > 0

    def is_empty(self) -> bool:
        """Return True if the priority queue has no elements;
        otherwise, return False."""
        return not self.__bool__()

    def __len__(self) -> int:
        """Return the number of elements in the priority queue."""
        return self._count

    def size(self) -> int:
        """Return the number of elements in the priority queue."""
        return self.__len__()

    def _left_child(self, parent: int) -> int:
        """Return the index of the left child of the given parent index."""
        return 2 * parent + 1

    def _right_child(self, parent: int) -> int:
        """Return the index of the right child of the given parent index."""
        return 1 + self._left_child(parent)

    def _parent(self, child: int) -> int:
        """Return the index of the parent of the given child index."""
        return (child - 1) // 2

    def _within_bounds(self, index: int) -> bool:
        """Return True if the given index is within the bounds of the priority queue.
        Note that the right bound is determined by the number of elements in the
        priority queue, not the size of the underlying storage."""
        return 0 <= index < (self._count)

    def _violation_occurs(self, parent: int) -> bool:
        """Return True if the heap property is violated at the given parent index;
        otherwise, return False.
        
        POTENTIAL BUG: IF THE PARENT INDEX IS OUT OF BOUNDS, THIS METHOD
        WILL RETURN FALSE, EVEN THOUGH THE HEAP PROPERTY CANNOT BE
        VERIFIED. THE CALLER MUST ENSURE THE PARENT INDEX IS WITHIN
        BOUNDS BEFORE CALLING THIS METHOD. MAYBE SAFER TO RAISE AN
        EXCEPTION INSTEAD."""

        # Assume no violation
        violation = False

        # Check for violation only if parent is within bounds
        if self._within_bounds(parent):

            # Get indices and value of parent
            left_idx = self._left_child(parent)
            right_idx = self._right_child(parent)
            parent_val = self._underlying[parent]

            # A violation occurs if either child is greater than the parent
            violation = (
                self._within_bounds(left_idx)
                and self._underlying[left_idx] > parent_val
            ) or (
                self._within_bounds(right_idx)
                and self._underlying[right_idx] > parent_val
            )
        # Return whether a violation occurs
        return violation

    def _restore_heap_property(self, parent: int) -> int:
        """Restore the heap property at the given parent index by swapping
        the parent with its largest child. Return the index of the child
        that was swapped with the parent."""

        # Assume the largest is the parent
        largest_idx = parent
        left_idx = self._left_child(parent)
        right_idx = self._right_child(parent)

        # Determine the largest among parent and children
        if (
            self._within_bounds(left_idx)
            and self._underlying[left_idx] > self._underlying[largest_idx]
        ):
            largest_idx = left_idx

        if (
            self._within_bounds(right_idx)
            and self._underlying[right_idx] > self._underlying[largest_idx]
        ):
            largest_idx = right_idx

        # Swap parent with largest child if needed
        if largest_idx != parent:
            self._underlying[parent], self._underlying[largest_idx] = (
                self._underlying[largest_idx],
                self._underlying[parent],
            )

        # Return the index of the largest child so that we can continue
        # restoring the heap property down the tree
        return largest_idx

    def add(self, value: int) -> None:
        """Add a value to the priority queue, maintaining the heap property."""

        # Add the new value at the end of the underlying storage
        if self._count == len(self._underlying):
            # Need to expand underlying storage
            self._underlying.append(value)
        else:
            # There is space in underlying storage; no need to expand
            # as space may have become available due to prior extractions
            self._underlying[self._count] = value
        
        # Either way, we have added a new value
        self._count += 1
        # Restore the heap property moving up the tree
        parent = self._parent(self._count - 1)
        while parent >= 0 and self._violation_occurs(parent):
            parent = self._restore_heap_property(parent)


    def _resize_accordingly(self) -> None:
        """Resize the underlying storage if the number of elements
        falls below the resize threshold."""
        if (
            len(self._underlying) > 0
            and self._count / len(self._underlying) < self._RESIZE_BELOW
        ):
            self._underlying = self._underlying[: self._count]

    def extract(self) -> int:
        """Remove and return the highest-priority value from the priority queue,
        maintaining the heap property."""

        # Check for empty priority queue
        if self.is_empty():
            raise IndexError(self._EXTRACT_EMPTY)
        
        # Save the top value to return later
        top_value = self._underlying[0]
        # Move the last value to the top and decrease count
        last_position = self._count - 1
        # Replace top with last value
        self._underlying[0] = self._underlying[last_position]
        # Clear out the last position
        self._underlying[last_position] = None
        # Decrease count
        self._count -= 1
        # Restore the heap property moving down the tree
        parent = 0
        while self._violation_occurs(parent):
            parent = self._restore_heap_property(parent)
        self._resize_accordingly()
        # Return the top value
        return top_value

    def peek(self) -> int:
        """Return the highest-priority value from the priority queue."""
        if self.is_empty():
            raise IndexError(self._PEEK_EMPTY)
        # Return the top value without removing it, if the queue is not empty
        return self._underlying[0]

    def peek_next(self) -> int:
        """Return the next-highest-priority value from the priority queue."""
        # Ensure there are at least two elements
        if self.size() > 1:
            # Find the next highest-priority value among the children of the root
            left_idx: int = self._left_child(0)
            right_idx = left_idx + 1
            # Assume left child is next
            next = self._underlying[left_idx]
            # Check right child
            if right_idx < self.size() and self._underlying[right_idx] > next:
                # Right child is next highest-priority
                next = self._underlying[right_idx]
            return next
        else:
            # We need at least two elements to peek at the next highest-priority value,
            # otherwise raise an exception because underling storage may be empty or have
            # only one element.
            raise IndexError(self._TOO_SMALL)

    def __str__(self) -> str:
        """Return a string representation of the priority queue."""
        elements: str = self._EL_SINGULAR if self.size() == 1 else self._EL_PLURAL
        return f"Priority queue has {self._count} {elements}\n{self._underlying[:self._count]}"


q = MyFirstPriorityQueue()
q.add(10)
q.add(20)
print(q)

Priority queue has 2 elements
[20, 10]
