13016213 Data Structures and Algorithms Laboratory

**NOTE** click here to select this cell, press Esc-Enter to enter cell edit mode, press Shift-Enter to put the cell back to display mode.

#### Name: Araya Siriadun

#### Student ID: 58090046

Laboratory 8: Priority Queues
===

## Overview

This laboratory introduces a new abstract data type known as a **priority queue**. The priority queue is a collection of prioritized elements that allows arbitrary element insertion, and allows the removal of the element that has the *highest priority*. When an element is inserted into a priority queue, the user designates its priority by providing an assoicated **key**. Although it is quite common for priorities to be expressed with numerical keys, any object may be used as a key, as long as the object type support a consistent meaning for the test $a < b$ for any instances $a$ and $b$ of that type. 

Applications of priority queues include simulation systems, job scheduling in computer systems, numerical compuations, a building block for a sorting algorithm and a file-compression algorithm to name a few.

## The Priority Queue ADT

### Definition of the Priority Queue ADT

A *priority queue* is a queue in which each item is assigned a priority and items with a higher priority are removed before those with a lower priority, irrespective of when they were added. Formally, we model an element and its priority as a key-value pair. The following methods are supported by a priority queue:

* **add(k, v):** Insert an item with key $k$ and value $v$ into priority queue.
* **max():** Return a tuple $(k, v)$, representing the key and value of an item in the priority queue with highest priority $k$; an error occurs if the priority queue is empty.
* **delmax():** Remove an item with highest prioirty $k$ from the priority queue, and return a tuple, $(k, v)$, representing the key and value of the removed item; an error occurs if the priority queue is empty.
* **is_empty():** Return $True$ if the priority queue does not contain any items.
* **len():** Return the number of items in the priority queue.

As an example, the following code snippet shows a series of operations and their effects on an initially empty priority queue $P$.

```python
P.add(ord('P'), 'P')    # P contains (80,'P')
P.add(ord('R'), 'R')    # P contains (80,'P'),(82,'R')
P.add(ord('I'), 'I')    # P contains (80,'P'),(82,'R'),(73,'I')
P.add(ord('O'), 'O')    # P contains (80,'P'),(82,'R'),(73,'I'),(79,'O')
P.max()                 # Return value: (82,'R')
P.delmax()              # P contains (80,'P'),(73,'I'),(79,'O'); Return value: (82,'R')
len(P)                  # Return value: 3
P.delmax()              # P contains (73,'I'),(79,'O'); Return value: (80,'P')
P.delmax()              # P contains (73,'I'); Return value: (79,'O')
P.delmax()              # P contains nothing; Return value: (73,'I')
P.delmax()              # "error"
P.is_empty()            # True
```


<hr />
### Question 1 [2 marks]. 
The priority queue is a proper generalization of the stack and the queue, because we can implement these data structures with priority queues.

a. Explain how to use a priority queue ADT to implement a **stack** ADT.

b. Explain how to use a priority queue ADT to implement a **queue** ADT.

In [1]:
#
### TODO.Q1
#
'''
(a) First, Initialized variable key. 
On a add function, call add(k, v) then increment variable key. 
On a delmax function, decrement variable key.

(b) First, Initialized variable key. 
On a add function, call add(k, v) then decrement variable key. 
On a delmax function, increment variable key.
'''

<hr />
### Implementations of a Priority Queue ADT

Numerous options of data structures are available for implementing priority queues:

* unsorted array based priority queues
* sorted array based priority queues
* unsorted doubly linked list based priority queues
* sorted doubly linked list based priority queues
* heap based priority queues

Before proceeding to the discussion of specific realizations of priority queues, let us define an abstract base class for a priority queue that includes a definition for a key-value pair item and a is_empty method.

In [1]:
class PriorityQueue:
    """Abstract base class for a priority queue."""
    
    class _Item:
        """Lightweight composite to store priority queue items."""
        __slots__ = '_key', '_value'
        
        def __init__(self, k, v):
            self._key = k
            self._value = v
            
        def __lt__(self, other):
            """Compare this item with other based on their keys."""
            return self._key < other._key
        
        def __eq__(self, other):
            """Compare this item with other based on their keys."""
            return self._key == other._key
        
        def __le__(self, other):
            """Compare this item with other based on their keys."""
            return self._key <= other._key

        
    def is_empty(self):
        """Return True if the priority queue is empty.
        
        The len method is an abstract method to be implemented by a concrete class.
        """
        return len(self) == 0

<hr />
### Question 2 [2 marks]. 
In the definition of the nested class $\_Item$, there is a class-member declaration $__slots__$. Explain the advantage of defining $__slots__$ in a class definition.

In [6]:
### TODO.Q2
'''
__slots__ reduces the purpose of dynamic dictionary save huge amount of space when there is million of object in memory 
because it does not require __dict__ for every instance
'''

<hr />
### An Unsorted Array based Priority Queue

Our first concrete implementation of a priority queue stores items within an unsorted array. We will use the built-in list class for this purpose. The $UnsortedArrayPriorityQueue$ inherits and implements the abstract base class $PriorityQueue$. A full implementation of the class is provided below.

In [2]:
class UnsortedArrayPriorityQueue(PriorityQueue):
    """A priority queue implemented with an unsorted list."""
    
    def __init__(self):
        """Create a new empty priority queue."""
        self._data = list()
    
    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)
    
    def add(self, key, value):
        """Add a key-value pair."""
        self._data.append(self._Item(key, value))
            
    def _findmax(self):
        """Return the index of the item with the maximum key."""
        assert not self.is_empty(), "_findmax: priority queue is empty."
        maxidx = 0
        for j in range(1, len(self)):
            if self._data[maxidx] < self._data[j]:
                maxidx = j                    
        return maxidx

    def max(self):
        """Return but do not remove (k, v) tuple with maximum key (highest priority)."""
        maxidx = self._findmax()        
        maxitem = self._data[maxidx]
        
        return (maxitem._key, maxitem._value)
    
    def delmax(self):
        """Remove and return (k, v) tuple with maximum key (highest priority)."""
        maxidx = self._findmax()        
        maxitem = self._data.pop(maxidx)
        
        return (maxitem._key, maxitem._value)

Now, let us develop and run a unit test for the $UnsortedArrayPriorityQueue$ class. We will use the tools provided in the Python's **unittest** module to construct and run our test. For a detailed description of the unittest module, please refer to the Python standard library documentation: https://docs.python.org/3/library/unittest.html. A nice tutorial of unittest module is available at: http://www.diveintopython3.net/unit-testing.html.

In [3]:
import unittest

class TestUnsortedArrayPriorityQueue(unittest.TestCase):

    def test_case1(self):
        # create a priority queue
        P = UnsortedArrayPriorityQueue()
        
        # test add, len, max methods
        P.add(ord('P'), 'P')
        self.assertEqual(len(P), 1)
        self.assertEqual(P.max(), (ord('P'), 'P'))
                
        P.add(ord('R'), 'R')    
        self.assertEqual(len(P), 2)
        self.assertEqual(P.max(), (ord('R'), 'R'))
        
        P.add(ord('I'), 'I')    
        self.assertEqual(len(P), 3)
        self.assertEqual(P.max(), (ord('R'), 'R'))
        
        P.add(ord('O'), 'O')    
        self.assertEqual(len(P), 4)
        self.assertEqual(P.max(), (ord('R'), 'R'))

        # test is_empty
        self.assertEqual(P.is_empty(), False)
        
        # test delmax
        item = P.delmax()
        self.assertEqual(len(P), 3)
        self.assertEqual(item, (ord('R'), 'R'))

        item = P.delmax()
        self.assertEqual(len(P), 2)
        self.assertEqual(item, (ord('P'), 'P'))

        item = P.delmax()
        self.assertEqual(len(P), 1)
        self.assertEqual(item, (ord('O'), 'O'))

        item = P.delmax()
        self.assertEqual(len(P), 0)
        self.assertEqual(item, (ord('I'), 'I'))

        self.assertRaises(AssertionError, P.delmax)

        # test is_empty
        self.assertEqual(P.is_empty(), True)
    
    def test_case2(self):
        ### TODO.Q3
        
        # create a UnsortedArrayPriorityQueue instance named P
        P = UnsortedArrayPriorityQueue()
        # add (ord('a'), 'a') to P
        P.add(ord('a'), 'a')
        # add (ord('s'), 's') to P
        P.add(ord('s'), 's')
        # add (ord('o'), 'o') to P
        P.add(ord('o'), 'o')
        # add (ord('r'), 'r') to P
        P.add(ord('r'), 'r')
        # add (ord('t'), 't') to P
        P.add(ord('t'), 't')
        # add (ord('i'), 'i') to P
        P.add(ord('i'), 'i')
        # add (ord('n'), 'n') to P
        P.add(ord('n'), 'n')
        # add (ord('g'), 'g') to P
        P.add(ord('g'), 'g')
        # check P.delmax() == 't'
        self.assertEqual(P.delmax(), (ord('t'), 't'))
        # check P.delmax() == 's'
        self.assertEqual(P.delmax(), (ord('s'), 's'))
        # check P.delmax() == 'r'
        self.assertEqual(P.delmax(), (ord('r'), 'r'))
        # check P.delmax() == 'o'
        self.assertEqual(P.delmax(), (ord('o'), 'o'))
        # check P.delmax() == 'n'
        self.assertEqual(P.delmax(), (ord('n'), 'n'))
        # check P.delmax() == 'i'
        self.assertEqual(P.delmax(), (ord('i'), 'i'))
        # check P.delmax() == 'g'
        self.assertEqual(P.delmax(), (ord('g'), 'g'))
        # check P.delmax() == 'a'
        self.assertEqual(P.delmax(), (ord('a'), 'a'))
        
        
def runtest():        
    testmodule = TestUnsortedArrayPriorityQueue()
    suite = unittest.TestLoader().loadTestsFromModule(testmodule)
    unittest.TextTestRunner().run(suite)


runtest()

..
----------------------------------------------------------------------
Ran 2 tests in 0.000s

OK


<hr />
### Question 3 [2 marks]. 
Complete the method $test\_case2$ of the $TestUnsortedArrayPriorityQueue$ class. Run the test cases, make sure that all tests pass.

In [9]:
### TODO.Q3 
# complete the method test_case2 in TestUnsortedArrayPriorityQueue


<hr />
### Question 4 [2 marks]. 
What are the worst-case running times of the following operations of the UnsortedArrayPriorityQueue class ?

* **len**
* **is_empty**
* **add**
* **max**
* **delmax**

In [10]:
### TODO.Q4
'''
Operation   Running Time
   len          O(1)
is_empty        O(1)
   add          O(1)
   max          O(n)
 delmax         O(n)
'''

<hr />
### A Sorted Array based Priority Queue

An alternative implementation of a priority queue uses an array, yet maintaining items sorted by nonincreasing keys. 
As a result, the first element of the array is an item with the maximum key (i.e., highest priority). A partial implementation of a sorted array based priority queue is as follows.

In [4]:
class SortedArrayPriorityQueue(PriorityQueue):
    """A priority queue implemented with a sorted list."""
    
    def __init__(self):
        """Create a new empty priority queue."""
        self._data = list()
    
    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)
    
    def add(self, key, value):
        """Add a key-value pair."""        
        ### TODO.Q5
        new = self._Item(key, value)
        if not self or new <= self._data[-1]:
            self._data += [new]
        else:
            for i in range(len(self)):
                if self._data[i] < new:
                    self._data.insert(i, new)
                    break
        
    def max(self):
        """Return but do not remove (k, v) tuple with maximum key (highest priority)."""
        assert not self.is_empty(), "max: priority queue is empty."
        ### TODO.Q5
        return (self._data[0]._key, self._data[0]._value)
    
    def delmax(self):
        """Remove and return (k, v) tuple with maximum key (highest priority)."""
        assert not self.is_empty(), "delmax: priority queue is empty."
        maxitem = self._data.pop(0)
        
        return (maxitem._key, maxitem._value)

<hr />
### Question 5 [2 marks]. 
Complete the $add$ and $max$ methods of the $SortedArrayPriorityQueue$ class.

In [12]:
### TODO.Q5
# complete the method add, max in SortedArrayPriorityQueue


<hr />
### Question 6 [2 marks]. 
What are the worst-case running times of the following operations of the SortedArrayPriorityQueue class ?

* **len**
* **is_empty**
* **add**
* **max**
* **delmax**

In [13]:
### TODO.Q6
'''
Operation   Running Time
   len          O(1)
is_empty        O(1)
   add          O(n^2)
   max          O(1)
 delmax         O(n)
'''

<hr />
### Question 7 [2 marks]. 
Construct and run a unit test for the SortedArrayQueue class.

In [16]:
### TODO.Q7

class TestSortedArrayPriorityQueue(unittest.TestCase):

    def test_case1(self):
        # create a priority queue
        P = SortedArrayPriorityQueue()
        
        # test add, len, max methods
        P.add(ord('P'), 'P')
        self.assertEqual(len(P), 1)
        self.assertEqual(P.max(), (ord('P'), 'P'))
                
        P.add(ord('R'), 'R')    
        self.assertEqual(len(P), 2)
        self.assertEqual(P.max(), (ord('R'), 'R'))
        
        P.add(ord('I'), 'I')    
        self.assertEqual(len(P), 3)
        self.assertEqual(P.max(), (ord('R'), 'R'))
        
        P.add(ord('O'), 'O')    
        self.assertEqual(len(P), 4)
        self.assertEqual(P.max(), (ord('R'), 'R'))

        # test is_empty
        self.assertEqual(P.is_empty(), False)
        
        # test delmax
        item = P.delmax()
        self.assertEqual(len(P), 3)
        self.assertEqual(item, (ord('R'), 'R'))

        item = P.delmax()
        self.assertEqual(len(P), 2)
        self.assertEqual(item, (ord('P'), 'P'))

        item = P.delmax()
        self.assertEqual(len(P), 1)
        self.assertEqual(item, (ord('O'), 'O'))

        item = P.delmax()
        self.assertEqual(len(P), 0)
        self.assertEqual(item, (ord('I'), 'I'))

        self.assertRaises(AssertionError, P.delmax)

        # test is_empty
        self.assertEqual(P.is_empty(), True)
    
    def test_case2(self):
        ### TODO.Q3
        
        # create a SortedArrayPriorityQueue instance named P
        P = SortedArrayPriorityQueue()
        # add (ord('a'), 'a') to P
        P.add(ord('a'), 'a')
        # add (ord('s'), 's') to P
        P.add(ord('s'), 's')
        # add (ord('o'), 'o') to P
        P.add(ord('o'), 'o')
        # add (ord('r'), 'r') to P
        P.add(ord('r'), 'r')
        # add (ord('t'), 't') to P
        P.add(ord('t'), 't')
        # add (ord('i'), 'i') to P
        P.add(ord('i'), 'i')
        # add (ord('n'), 'n') to P
        P.add(ord('n'), 'n')
        # add (ord('g'), 'g') to P
        P.add(ord('g'), 'g')
        # check P.delmax() == 't'
        self.assertEqual(P.delmax(), (ord('t'), 't'))
        # check P.delmax() == 's'
        self.assertEqual(P.delmax(), (ord('s'), 's'))
        # check P.delmax() == 'r'
        self.assertEqual(P.delmax(), (ord('r'), 'r'))
        # check P.delmax() == 'o'
        self.assertEqual(P.delmax(), (ord('o'), 'o'))
        # check P.delmax() == 'n'
        self.assertEqual(P.delmax(), (ord('n'), 'n'))
        # check P.delmax() == 'i'
        self.assertEqual(P.delmax(), (ord('i'), 'i'))
        # check P.delmax() == 'g'
        self.assertEqual(P.delmax(), (ord('g'), 'g'))
        # check P.delmax() == 'a'
        self.assertEqual(P.delmax(), (ord('a'), 'a'))
        
        
def runtest():        
    testmodule = TestSortedArrayPriorityQueue()
    suite = unittest.TestLoader().loadTestsFromModule(testmodule)
    unittest.TextTestRunner().run(suite)


runtest()

..
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK


<hr />
## Programming Quiz 8 [6 marks]

Implement a priority queue ADT using an **unsorted doubly linked list**. Construct and run a unit test for your priority queue implementation.

In [17]:
### TODO.P8
class UnsortedDoublyLinkedList:
    
    class _Node:
        __slots__ = '_element', '_prev', '_next'    

        def __init__(self, element, previous, next):
            self._element = element
            self._prev = previous
            self._next = next

    def __init__(self):
        self.head = self.tail = self._Node(None, None, None)
        self.head._next = self.tail
        self.tail._prev = self.head
        self.size = 0

    def __len__(self):
        return self.size

    def is_empty(self):
        return self.size == 0

    def append(self, e):
        if self.size:
            new = self._Node(e, self.tail, None)
            self.tail._next = new
            self.tail = new
        else:
             self.head = self.tail = self._Node(e, None, None)
        self.size += 1
        
    def remove(self, e):
        curNode = self.head
        while curNode:
            if curNode._element == e:
                delNode = curNode._element
                if len(self) == 1:   # contain only one element
                    self.head = self.tail = None
                elif curNode._prev is None:   # remove head
                    self.head = curNode._next
                    curNode._next._prev = None
                elif curNode._next is None:   # remove tail
                    self.tail = curNode._prev
                    curNode._prev._next = None
                else:
                    curNode._prev._next = curNode._next
                    curNode._next._prev = curNode._prev
                break
            curNode = curNode._next
        if 'delNode' not in locals():
            raise Exception("remove(e): e not in list")
        self.size -= 1
        return delNode
 
    def show(self):
        print("Show list data:")
        curNode = self.head
        while curNode is not None:
            print(curNode._element, end=' ')
            curNode = curNode._next
    
class PriorityQueueADT(PriorityQueue):
    
    def __init__(self):
        self._data = UnsortedDoublyLinkedList()
    
    def __len__(self):
        return len(self._data)
    
    def add(self, key, value):
        self._data.append(self._Item(key, value))
            
    def max(self):
        assert not self.is_empty(), "max: priority queue is empty."
        curNode = self._data.head
        maxitem = curNode._element
        curNode = curNode._next
        while curNode:
            if maxitem < curNode._element:
                maxitem = curNode._element
            curNode = curNode._next
        return (maxitem._key, maxitem._value)
    
    def delmax(self):
        max = self.max()
        maxitem = self._data.remove(self._Item(max[0],max[1]))
        return (maxitem._key, maxitem._value)

In [37]:
class TestPriorityQueueADT(unittest.TestCase):

    def test_case1(self):
        # create a priority queue
        P = PriorityQueueADT()
        
        # test add, len, max methods
        P.add(ord('P'), 'P')
        self.assertEqual(len(P), 1)
        self.assertEqual(P.max(), (ord('P'), 'P'))
                
        P.add(ord('R'), 'R')    
        self.assertEqual(len(P), 2)
        self.assertEqual(P.max(), (ord('R'), 'R'))
        
        P.add(ord('I'), 'I')    
        self.assertEqual(len(P), 3)
        self.assertEqual(P.max(), (ord('R'), 'R'))
        
        P.add(ord('O'), 'O')    
        self.assertEqual(len(P), 4)
        self.assertEqual(P.max(), (ord('R'), 'R'))

        # test is_empty
        self.assertEqual(P.is_empty(), False)
        
        # test delmax
        item = P.delmax()
        self.assertEqual(len(P), 3)
        self.assertEqual(item, (ord('R'), 'R'))

        item = P.delmax()
        self.assertEqual(len(P), 2)
        self.assertEqual(item, (ord('P'), 'P'))

        item = P.delmax()
        self.assertEqual(len(P), 1)
        self.assertEqual(item, (ord('O'), 'O'))

        item = P.delmax()
        self.assertEqual(len(P), 0)
        self.assertEqual(item, (ord('I'), 'I'))

        self.assertRaises(AssertionError, P.delmax)

        # test is_empty
        self.assertEqual(P.is_empty(), True)
    
    def test_case2(self):
        ### TODO.Q3
        
        # create a SortedArrayPriorityQueue instance named P
        P = PriorityQueueADT()
        # add (ord('a'), 'a') to P
        P.add(ord('a'), 'a')
        # add (ord('s'), 's') to P
        P.add(ord('s'), 's')
        # add (ord('o'), 'o') to P
        P.add(ord('o'), 'o')
        # add (ord('r'), 'r') to P
        P.add(ord('r'), 'r')
        # add (ord('t'), 't') to P
        P.add(ord('t'), 't')
        # add (ord('i'), 'i') to P
        P.add(ord('i'), 'i')
        # add (ord('n'), 'n') to P
        P.add(ord('n'), 'n')
        # add (ord('g'), 'g') to P
        P.add(ord('g'), 'g')
        # check P.delmax() == 't'
        self.assertEqual(P.delmax(), (ord('t'), 't'))
        # check P.delmax() == 's'
        self.assertEqual(P.delmax(), (ord('s'), 's'))
        # check P.delmax() == 'r'
        self.assertEqual(P.delmax(), (ord('r'), 'r'))
        # check P.delmax() == 'o'
        self.assertEqual(P.delmax(), (ord('o'), 'o'))
        # check P.delmax() == 'n'
        self.assertEqual(P.delmax(), (ord('n'), 'n'))
        # check P.delmax() == 'i'
        self.assertEqual(P.delmax(), (ord('i'), 'i'))
        # check P.delmax() == 'g'
        self.assertEqual(P.delmax(), (ord('g'), 'g'))
        # check P.delmax() == 'a'
        self.assertEqual(P.delmax(), (ord('a'), 'a'))
        
        
def runtest():        
    testmodule = TestPriorityQueueADT()
    suite = unittest.TestLoader().loadTestsFromModule(testmodule)
    unittest.TextTestRunner().run(suite)


runtest()

..
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK
