In [45]:
import random
import time

In [85]:
class MinMaxHeap:
    def __init__(self, k):
        self.heap = []
        self.k = k 
     
    def insert(self, value):
        self.heap.append(value)
        self._bubble_up(len(self.heap) - 1)
    
    def deletion(self, index):
        if len(self.heap) == 0 or index >= len(self.heap):
            return None

        self.heap[index], self.heap[-1] = self.heap[-1], self.heap[index]
        deleted_value = self.heap.pop()

        if index < len(self.heap):
            self._bubble_up(index)
            self._bubble_down(index)

        return deleted_value

    def _bubble_up(self, index):
        while index > 0:
            parent_index = (index - 1) // self.k
            if self.heap[index] > self.heap[parent_index]:
                self.heap[index], self.heap[parent_index] = self.heap[parent_index], self.heap[index]
                index = parent_index
            else:
                break

    def _bubble_down(self, index):
        while True:
            largest = index
            for i in range(1, self.k + 1):
                child_index = self.k * index + i
                if child_index < len(self.heap) and self.heap[child_index] > self.heap[largest]:
                    largest = child_index

            if index == largest:
                break
            else:
                self.heap[index], self.heap[largest] = self.heap[largest], self.heap[index]
                index = largest


In [86]:
def generate_random_number(sizeN,minValue,maxValue):
        return [random.randint(minValue, maxValue) for _ in range(sizeN)]

In [88]:
class LinearHeap:
    def __init__(self, numberArray, k=2):
        self.heap = numberArray
        self.k = k
        self.build_heap()
   
    def deletion(self, index):
        if len(self.heap) == 0 or index >= len(self.heap):
            return None

        self.heap[index], self.heap[-1] = self.heap[-1], self.heap[index]
        deleted_value = self.heap.pop()

        if index < len(self.heap):
            self._bubble_down(index)  # Only bubble down in LinearHeap after deletion
        return deleted_value
    
    def build_heap(self):
        first_parent_idx = (len(self.heap) - 1) // self.k
        for current_idx in reversed(range(first_parent_idx + 1)):
            self._bubble_down(current_idx)
    
    def _bubble_down(self, index):
        while True:
            largest = index
            for i in range(1, self.k + 1):
                child_index = self.k * index + i
                if child_index < len(self.heap) and self.heap[child_index] > self.heap[largest]:
                    largest = child_index

            if index == largest:
                break
            else:
                self.heap[index], self.heap[largest] = self.heap[largest], self.heap[index]
                index = largest

In [91]:
for k in range(2, 13):
    print(f"K={k} Heapify process started")

    # SIZE 1000 Insertion
    randomNumbers = generate_random_number(1000, 0, 1000000000)

    # MinMaxHeap Insertion
    small_heap = MinMaxHeap(k)
    beginingOfCounter = time.perf_counter()
    for num in randomNumbers:
        small_heap.insert(num)
    endCounter = time.perf_counter()
    print(f"Time for Insertion Method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

    # Linear Heapify
    # Clone the random numbers list for LinearHeap to ensure fair comparison
    randomNumbersForLinearHeap = randomNumbers[:]
    beginingOfCounter = time.perf_counter()
    linear_small_heap = LinearHeap(randomNumbersForLinearHeap, k)
    endCounter = time.perf_counter()
    print(f"Time for Linear Heapify method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

    # Random Deletion
    removeElements = generate_random_number(100, 0, len(small_heap.heap) - 1)

    # MinMaxHeap Deletion
    beginingOfCounter = time.perf_counter()
    for removeIndex in removeElements:
        small_heap.deletion(removeIndex)
    endCounter = time.perf_counter()
    print(f"Time taken for deletion of Insertion Method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

    # LinearHeap Deletion
    beginingOfCounter = time.perf_counter()
    for removeIndex in removeElements:
        linear_small_heap.deletion(removeIndex)
    endCounter = time.perf_counter()
    print(f"Time taken for deletion of Linear Method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

K=2 Heapify process started
Time for Insertion Method for K=2 is 0.0006 seconds
Time for Linear Heapify method for K=2 is 0.0155 seconds
Time taken for deletion of Insertion Method for K=2 is 0.0003 seconds
Time taken for deletion of Linear Method for K=2 is 0.0002 seconds
K=3 Heapify process started
Time for Insertion Method for K=3 is 0.0005 seconds
Time for Linear Heapify method for K=3 is 0.0011 seconds
Time taken for deletion of Insertion Method for K=3 is 0.0001 seconds
Time taken for deletion of Linear Method for K=3 is 0.0001 seconds
K=4 Heapify process started
Time for Insertion Method for K=4 is 0.0003 seconds
Time for Linear Heapify method for K=4 is 0.0003 seconds
Time taken for deletion of Insertion Method for K=4 is 0.0001 seconds
Time taken for deletion of Linear Method for K=4 is 0.0001 seconds
K=5 Heapify process started
Time for Insertion Method for K=5 is 0.0002 seconds
Time for Linear Heapify method for K=5 is 0.0003 seconds
Time taken for deletion of Insertion Meth

In [92]:
for k in range(2, 13):
    print(f"K={k} Heapify process started")

    # SIZE 1000 Insertion
    randomNumbers = generate_random_number(1000000, 0, 1000000000)

    # MinMaxHeap Insertion
    small_heap = MinMaxHeap(k)
    beginingOfCounter = time.perf_counter()
    for num in randomNumbers:
        small_heap.insert(num)
    endCounter = time.perf_counter()
    print(f"Time for Insertion Method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

    # Linear Heapify
    # Clone the random numbers list for LinearHeap to ensure fair comparison
    randomNumbersForLinearHeap = randomNumbers[:]
    beginingOfCounter = time.perf_counter()
    linear_small_heap = LinearHeap(randomNumbersForLinearHeap, k)
    endCounter = time.perf_counter()
    print(f"Time for Linear Heapify method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

    # Random Deletion
    removeElements = generate_random_number(100, 0, len(small_heap.heap) - 1)

    # MinMaxHeap Deletion
    beginingOfCounter = time.perf_counter()
    for removeIndex in removeElements:
        small_heap.deletion(removeIndex)
    endCounter = time.perf_counter()
    print(f"Time taken for deletion of Insertion Method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

    # LinearHeap Deletion
    beginingOfCounter = time.perf_counter()
    for removeIndex in removeElements:
        linear_small_heap.deletion(removeIndex)
    endCounter = time.perf_counter()
    print(f"Time taken for deletion of Linear Method for K={k} is {endCounter - beginingOfCounter:0.4f} seconds")

K=2 Heapify process started
Time for Insertion Method for K=2 is 0.5898 seconds
Time for Linear Heapify method for K=2 is 0.9093 seconds
Time taken for deletion of Insertion Method for K=2 is 0.0002 seconds
Time taken for deletion of Linear Method for K=2 is 0.0002 seconds
K=3 Heapify process started
Time for Insertion Method for K=3 is 0.5523 seconds
Time for Linear Heapify method for K=3 is 0.7803 seconds
Time taken for deletion of Insertion Method for K=3 is 0.0004 seconds
Time taken for deletion of Linear Method for K=3 is 0.0004 seconds
K=4 Heapify process started
Time for Insertion Method for K=4 is 0.4552 seconds
Time for Linear Heapify method for K=4 is 0.7231 seconds
Time taken for deletion of Insertion Method for K=4 is 0.0004 seconds
Time taken for deletion of Linear Method for K=4 is 0.0004 seconds
K=5 Heapify process started
Time for Insertion Method for K=5 is 0.6250 seconds
Time for Linear Heapify method for K=5 is 0.5964 seconds
Time taken for deletion of Insertion Meth

In [4]:
import time
import random
import heapq

class MaxHeap:
    def __init__(self, k):
        self.heap = []
        self.k = k

    def add(self, value):
        if len(self.heap) < self.k:
            heapq.heappush(self.heap, -value)  # Max-heap by pushing negative values
        elif value < -self.heap[0]:
            heapq.heapreplace(self.heap, -value)

    def get_kth_smallest(self):
        return -self.heap[0]

# Generating a large random stream for testing
def generate_random_stream(size, max_value):
    for _ in range(size):
        yield random.randint(0, max_value)

# Test with large data
stream_size = 10000000  # 100 million numbers
k = 100000  # track the smallest 1 million numbers

heap = MaxHeap(k)
start_time = time.perf_counter()

for value in generate_random_stream(stream_size, 1000000000):
    heap.add(value)

end_time = time.perf_counter()

kth_smallest = heap.get_kth_smallest()
total_time = end_time - start_time

print(f"K'th smallest: {kth_smallest}, Time taken: {total_time:.4f} seconds")


K'th smallest: 10085626, Time taken: 3.9701 seconds
