Lab Assessment (30 Jan 2024)
---
Done By Praveen Perumal (CH.EN.U4CSE22047)

In [127]:
import operator
from collections import UserList

In [128]:
class CustomList(UserList):
    def __init__(self, initList=None, key=None) -> None:
        self.key = key
        super().__init__(initList)

    def __getitem__(self, i): # type: ignore
        if callable(self.key):
            return self.key(super().__getitem__(i))
        else:
            return super().__getitem__(i)

    def swap(self, i, j):
        self.data[i], self.data[j] = self.data[j], self.data[i]

In [129]:
class PriorityQueue:
    def __init__(self, *, max=False, key=None) -> None:
        self.que = CustomList(key=key)
        self.cmp = operator.gt if not max else operator.lt

    def _child(self, i):
        l = 2*i + 1 if 2*i + 1 < len(self.que) else None
        r = 2*i + 2 if 2*i + 2 < len(self.que) else None
        return l, r

    def _next_child(self, i):
        l, r = self._child(i)
        if l and r:
            return r if self.cmp(self.que[l], self.que[r]) else l
        return l

    def _parent(self, i):
        return (i-1) // 2 if i > 0 else None

    def _bubble_up(self, i):
        j = self._parent(i)
        while j is not None:
            if self.cmp(self.que[j], self.que[i]):
                self.que.swap(i, j)
                i = j
                j = self._parent(i)
            else:
                break

    def _bubble_down(self, i):
        j = self._next_child(i)
        while j is not None:
            if self.cmp(self.que[i], self.que[j]):
                self.que.swap(i, j)
                i = j
                j = self._next_child(i)
            else:
                break

    def push(self, value):
        self.que.append(value)
        self._bubble_up(len(self.que) - 1)

    def pop(self, value=None):
        if not self.que:
            return None

        index = self.que.index(value) if value is not None else 0
        self.que.swap(index, -1)
        top = self.que.pop()
        parent = self._parent(index)
        if parent and self.cmp(self.que[parent], self.que[index]):
            self._bubble_up(index)
        else:
            self._bubble_down(index)
        return top

    def peek(self):
        return self.que.data[0] if self.que else None

    def __len__(self):
        return len(self.que)

    def __str__(self):
        return str(self.que)

In [130]:
q = PriorityQueue() # MinHeap

In [131]:
import random
for _ in range(5):
    q.push(random.randint(1, 10))

In [132]:
while q:
    print(q.pop())

1
2
4
5
6


In [133]:
q = PriorityQueue(max=True) # MaxHeap

In [134]:
import random
for _ in range(5):
    q.push(random.randint(1, 10))

In [135]:
while q:
    print(q.pop())

8
7
3
3
1


In [136]:
q = PriorityQueue(key=lambda t: t[0])

In [137]:
q.push((2, 'Task 1'))
q.push((1, 'Task 2'))
q.push((3, 'Task 3'))
q.push((2, 'Task 4'))

In [138]:
while q:
    print(q.pop()[1])

Task 2
Task 4
Task 1
Task 3


In [139]:
class Process:
    def __init__(self, id, priority=0) -> None:
        self.id = id
        self.priority = priority

    def __str__(self) -> str:
        return f"Process({self.id}, {self.priority})"
    
    def __repr__(self) -> str:
        return str(self)

In [140]:
q = PriorityQueue(max=True, key=operator.attrgetter('priority'))

In [141]:
q.push(Process(1))
q.push(Process(3,5))
q.push(Process(2,2))
q.push(Process(4,3))
q.push(Process(5))

In [142]:
q.peek()

Process(3, 5)

In [143]:
while q:
    print(q.pop())

Process(3, 5)
Process(4, 3)
Process(2, 2)
Process(1, 0)
Process(5, 0)
