Most code in this file comes from the rainbow_smoke folder in my Generative Art adventures.


In [1]:
%load_ext autoreload
%autoreload 3

import numpy as np

from numba import njit
from numba import int64, njit
from numba.typed import List
from numba.types import Tuple

### Testing array_heapq library


In [2]:
from array_heapq import *

##### Heapify test


In [6]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
for i in range(size):
    array[i][0] = np.random.randint(size)
count_holder = np.array([size])
array_heapify(array, count_holder)
output = []
for count in range(size, 0, -1):
    output.append(array_heappop(array, count_holder))
assert sorted(output) == output

In [7]:
@njit
def test_array_heapify(capacity, array, size):
    for i in range(size):
        array[i][0] = np.random.randint(capacity)
    count_holder = np.array([size])
    array_heapify(array, count_holder)
    output = []
    for count in range(size, 0, -1):
        output.append(array_heappop(array, count_holder))
    return output

In [8]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
output = test_array_heapify(capacity, array, size)
assert sorted(output) == output

##### Heappush test


In [9]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
count_holder = np.array([0])
for count in range(size):
    array_heappush(array, count_holder, np.random.randint(capacity))
output = []
for count in range(size, 0, -1):
    output.append(array_heappop(array, count_holder))
assert sorted(output) == output

In [10]:
@njit
def test_array_heappush(capacity, array, size):
    count_holder = np.array([0])
    for count in range(size):
        array_heappush(array, count_holder, np.random.randint(capacity))
    output = []
    for count in range(size, 0, -1):
        output.append(array_heappop(array, count_holder))
    return output

In [11]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
output = test_array_heappush(capacity, array, size)
assert sorted(output) == output

##### Interspersed push and pop tests


In [12]:
capacity = 20_000
iterations = capacity * 200
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity // 2
count_holder = np.array([0])
for count in range(size):
    array_heappush(array, count_holder, np.random.randint(capacity, size=(1,)))
for i in range(iterations):
    if np.random.randint(2) == 0 and size != 0 or size == capacity:
        elem = array_heappop(array, count_holder)
        size -= 1
        # if size:
        #     assert (elem[0] <= array[:size, 0]).all()
    else:
        array_heappush(array, count_holder, np.random.randint(capacity))
        size += 1
    assert count_holder[0] == size
output = []
for count in range(size, 0, -1):
    output.append(array_heappop(array, count_holder))
assert sorted(output) == output
print(len(output))

9776


In [13]:
@njit
def test_array_interspersed(capacity, array, size, iterations):
    count_holder = np.array([0])
    for count in range(size):
        array_heappush(array, count_holder, np.random.randint(0, capacity, size=(1,)))
    for i in range(iterations):
        if np.random.randint(2) == 0 and size != 0 or size == capacity:
            elem = array_heappop(array, count_holder)
            size -= 1
            # assert (elem[0] <= array[:size, 0]).all()
        else:
            array_heappush(array, count_holder, np.random.randint(capacity))
            size += 1
        assert count_holder[0] == size
    output = []
    for count in range(size, 0, -1):
        output.append(array_heappop(array, count_holder))
    return output

In [14]:
capacity = 20_000
iterations = capacity * 200
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity // 2
output = test_array_interspersed(capacity, array, size, iterations)
assert sorted(output) == output
print(len(output))

11064


### Testing array_heapq without holder for the size


In [2]:
@njit
def is_less(a, b):
    m, n = len(a), len(b)
    assert m == n
    for i in range(m):
        if a[i] < b[i]:
            return True
    return False


@njit
def heappush(heap, heap_size, item):
    """Push item onto heap, maintaining the heap invariant."""
    heap[heap_size] = item
    heap_size += 1
    _siftdown(heap, 0, heap_size - 1)


@njit
def heappop(heap, heap_size):
    """Pop the smallest item off the heap, maintaining the heap invariant."""
    lastelt = heap[heap_size - 1].copy()  # raises appropriate IndexError if heap is empty
    heap_size -= 1
    if heap_size:
        returnitem = heap[0].copy()
        heap[0] = lastelt
        _siftup(heap, 0, heap_size)
        return returnitem
    return lastelt


@njit
def heapreplace(heap, heap_size, item):
    """Pop and return the current smallest value, and add the new item.

    This is more efficient than heappop() followed by heappush(), and can be
    more appropriate when using a fixed-size heap.  Note that the value
    returned may be larger than item!  That constrains reasonable uses of
    this routine unless written as part of a conditional replacement:

        if item > heap[0]:
            item = heapreplace(heap, item)
    """
    returnitem = heap[0].copy()  # raises appropriate IndexError if heap is empty
    heap[0] = item
    _siftup(heap, 0, heap_size)
    return returnitem


@njit
def heappushpop(heap, heap_size, item):
    """Fast version of a heappush followed by a heappop."""
    if heap and heap[0] < item:
        item, heap[0] = heap[0], item
        _siftup(heap, 0, heap_size)
    return item


@njit
def heapify(x, heap_size):
    """Transform list into a heap, in-place, in O(len(x)) time."""
    n = heap_size
    # Transform bottom-up.  The largest index there's any point to looking at
    # is the largest with a child index in-range, so must have 2*i + 1 < n,
    # or i < (n-1)/2.  If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
    # j-1 is the largest, which is n//2 - 1.  If n is odd = 2*j+1, this is
    # (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1.
    for i in range(n // 2 - 1, -1, -1):
        _siftup(x, i, heap_size)


# 'heap' is a heap at all indices >= startpos, except possibly for pos.  pos
# is the index of a leaf with a possibly out-of-order value.  Restore the
# heap invariant.
@njit
def _siftdown(heap, startpos, pos):
    newitem = heap[pos].copy()
    # Follow the path to the root, moving parents down until finding a place
    # newitem fits.
    while pos > startpos:
        parentpos = (pos - 1) >> 1
        parent = heap[parentpos]
        if is_less(newitem, parent):
            heap[pos] = parent
            pos = parentpos
            continue
        break
    heap[pos] = newitem


# The child indices of heap index pos are already heaps, and we want to make
# a heap at index pos too.  We do this by bubbling the smaller child of
# pos up (and so on with that child's children, etc) until hitting a leaf,
# then using _siftdown to move the oddball originally at index pos into place.
@njit
def _siftup(heap, pos, endpos):
    startpos = pos
    newitem = heap[pos].copy()
    # Bubble up the smaller child until hitting a leaf.
    childpos = 2 * pos + 1  # leftmost child position
    while childpos < endpos:
        # Set childpos to index of smaller child.
        rightpos = childpos + 1
        if rightpos < endpos and not is_less(heap[childpos], heap[rightpos]):
            childpos = rightpos
        # Move the smaller child up.
        heap[pos] = heap[childpos]
        pos = childpos
        childpos = 2 * pos + 1
    # The leaf at pos is empty now.  Put newitem there, and bubble it up
    # to its final resting place (by sifting its parents down).
    heap[pos] = newitem
    _siftdown(heap, startpos, pos)


array_is_less = is_less
array_heappush = heappush
array_heappop = heappop
array_heapreplace = heapreplace
array_heappushpop = heappushpop
array_heapify = heapify

##### Heapify test


In [5]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
for i in range(size):
    array[i][0] = np.random.randint(size)
array_heapify(array, size)
output = []
for count in range(size, 0, -1):
    output.append(array_heappop(array, count))
assert sorted(output) == output

In [6]:
@njit
def test_array_heapify(capacity, array, size):
    for i in range(size):
        array[i][0] = np.random.randint(capacity)
    array_heapify(array, size)
    output = []
    for count in range(size, 0, -1):
        output.append(array_heappop(array, count))
    return output

In [8]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
output = test_array_heapify(capacity, array, size)
assert sorted(output) == output

##### Heappush test


In [9]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
for count in range(size):
    array_heappush(array, count, np.random.randint(capacity))
output = []
for count in range(size, 0, -1):
    output.append(array_heappop(array, count))
assert sorted(output) == output

In [10]:
@njit
def test_array_heappush(capacity, array, size):
    for count in range(size):
        array_heappush(array, count, np.random.randint(capacity))
    output = []
    for count in range(size, 0, -1):
        output.append(array_heappop(array, count))
    return output

In [119]:
capacity = 1_000_000
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity
output = test_array_heappush(capacity, array, size)
assert sorted(output) == output

##### Interspersed push and pop tests


In [168]:
capacity = 20_000
iterations = capacity * 200
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity // 2
for count in range(size):
    array_heappush(array, count, np.random.randint(capacity, size=(1,)))
for i in range(iterations):
    if np.random.randint(2) == 0 and size != 0 or size == capacity:
        elem = array_heappop(array, size)
        size -= 1
        # if size:
        #     assert (elem[0] <= array[:size, 0]).all()
    else:
        array_heappush(array, size, np.random.randint(capacity))
        size += 1
output = []
for count in range(size, 0, -1):
    output.append(array_heappop(array, count))
assert sorted(output) == output
print(len(output))

12196


In [157]:
@njit
def test_array_interspersed(capacity, array, size, iterations):
    for count in range(size):
        array_heappush(array, count, np.random.randint(0, capacity, size=(1,)))
    for i in range(iterations):
        if np.random.randint(2) == 0 and size != 0 or size == capacity:
            elem = array_heappop(array, size)
            size -= 1
            # assert (elem[0] <= array[:size, 0]).all()
        else:
            array_heappush(array, size, np.random.randint(capacity))
            size += 1
    output = []
    for count in range(size, 0, -1):
        output.append(array_heappop(array, count))
    return output

In [165]:
capacity = 20_000
iterations = capacity * 200
array = np.empty((capacity, 1), dtype=np.int64)
size = capacity // 2
output = test_array_interspersed(capacity, array, size, iterations)
assert sorted(output) == output
print(len(output))

13300


### Testing njit list heapq


In [67]:
@njit
def heappush(heap, item):
    """Push item onto heap, maintaining the heap invariant."""
    heap.append(item)
    _siftdown(heap, 0, len(heap) - 1)


@njit
def heappop(heap):
    """Pop the smallest item off the heap, maintaining the heap invariant."""
    lastelt = heap.pop()  # raises appropriate IndexError if heap is empty
    if heap:
        returnitem = heap[0]
        heap[0] = lastelt
        _siftup(heap, 0)
        return returnitem
    return lastelt


@njit
def heapreplace(heap, item):
    """Pop and return the current smallest value, and add the new item.

    This is more efficient than heappop() followed by heappush(), and can be
    more appropriate when using a fixed-size heap.  Note that the value
    returned may be larger than item!  That constrains reasonable uses of
    this routine unless written as part of a conditional replacement:

        if item > heap[0]:
            item = heapreplace(heap, item)
    """
    returnitem = heap[0]  # raises appropriate IndexError if heap is empty
    heap[0] = item
    _siftup(heap, 0)
    return returnitem


@njit
def heappushpop(heap, item):
    """Fast version of a heappush followed by a heappop."""
    if heap and heap[0] < item:
        item, heap[0] = heap[0], item
        _siftup(heap, 0)
    return item


@njit
def heapify(x):
    """Transform list into a heap, in-place, in O(len(x)) time."""
    n = len(x)
    # Transform bottom-up.  The largest index there's any point to looking at
    # is the largest with a child index in-range, so must have 2*i + 1 < n,
    # or i < (n-1)/2.  If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
    # j-1 is the largest, which is n//2 - 1.  If n is odd = 2*j+1, this is
    # (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1.
    for i in range(n // 2 - 1, -1, -1):
        _siftup(x, i)


# 'heap' is a heap at all indices >= startpos, except possibly for pos.  pos
# is the index of a leaf with a possibly out-of-order value.  Restore the
# heap invariant.
@njit
def _siftdown(heap, startpos, pos):
    newitem = heap[pos]
    # Follow the path to the root, moving parents down until finding a place
    # newitem fits.
    while pos > startpos:
        parentpos = (pos - 1) >> 1
        parent = heap[parentpos]
        if newitem < parent:
            heap[pos] = parent
            pos = parentpos
            continue
        break
    heap[pos] = newitem


# The child indices of heap index pos are already heaps, and we want to make
# a heap at index pos too.  We do this by bubbling the smaller child of
# pos up (and so on with that child's children, etc) until hitting a leaf,
# then using _siftdown to move the oddball originally at index pos into place.
@njit
def _siftup(heap, pos):
    endpos = len(heap)
    startpos = pos
    newitem = heap[pos]
    # Bubble up the smaller child until hitting a leaf.
    childpos = 2 * pos + 1  # leftmost child position
    while childpos < endpos:
        # Set childpos to index of smaller child.
        rightpos = childpos + 1
        if rightpos < endpos and not heap[childpos] < heap[rightpos]:
            childpos = rightpos
        # Move the smaller child up.
        heap[pos] = heap[childpos]
        pos = childpos
        childpos = 2 * pos + 1
    # The leaf at pos is empty now.  Put newitem there, and bubble it up
    # to its final resting place (by sifting its parents down).
    heap[pos] = newitem
    _siftdown(heap, startpos, pos)


njit_heappush = heappush
njit_heappop = heappop
njit_heapreplace = heapreplace
njit_heappushpop = heappushpop
njit_heapify = heapify

##### Heapify test


In [112]:
size = 1_000_000
array = List.empty_list(Tuple((int64,)))
for count in range(size):
    array.append((np.random.randint(capacity),))
njit_heapify(array)
output = []
for count in range(size, 0, -1):
    output.append(njit_heappop(array))
assert sorted(output) == output

In [250]:
@njit
def test_njit_heapify(array, size):
    for i in range(size):
        array.append((np.random.randint(capacity),))
    njit_heapify(array)
    output = []
    for count in range(size, 0, -1):
        output.append(njit_heappop(array))
    return output

In [252]:
size = 1_000_000
array = List.empty_list(Tuple((int64,)))
output = test_njit_heapify(array, size)
assert sorted(output) == output

##### Heappush test


In [113]:
size = 1_000_000
array = List.empty_list(Tuple((int64,)))
for count in range(size):
    njit_heappush(array, (np.random.randint(capacity),))
output = []
for count in range(size, 0, -1):
    output.append(njit_heappop(array))
assert sorted(output) == output

In [115]:
@njit
def test_njit_heappush(array, size):
    for count in range(size):
        njit_heappush(array, (np.random.randint(capacity),))
    output = []
    for count in range(size, 0, -1):
        output.append(njit_heappop(array))
    return output

In [117]:
size = 1_000_000
array = List.empty_list(Tuple((int64,)))
output = test_njit_heappush(array, size)
assert sorted(output) == output

##### Interspersed push and pop tests


In [169]:
capacity = 20_000
iterations = capacity * 200
array = List.empty_list(Tuple((int64,)))
size = capacity // 2
for count in range(size):
    njit_heappush(array, (np.random.randint(capacity),))
for i in range(iterations):
    if np.random.randint(2) == 0 and size != 0 or size == capacity:
        elem = njit_heappop(array)
        size -= 1
        assert len(array) == size
        # if size:
        #     assert all(elem <= array[i] for i in range(size))
    else:
        njit_heappush(array, (np.random.randint(capacity),))
        size += 1
        assert len(array) == size
output = []
for count in range(size, 0, -1):
    output.append(njit_heappop(array))
assert sorted(output) == output
print(len(output))

11026


In [153]:
@njit
def test_njit_interspersed(capacity, array, size, iterations):
    for count in range(size):
        njit_heappush(array, (np.random.randint(capacity),))
    for i in range(iterations):
        if np.random.randint(2) == 0 and size != 0 or size == capacity:
            elem = njit_heappop(array)
            size -= 1
            # assert len(array) == size
            # for i in range(size):
            #     assert elem <= array[i]
        else:
            njit_heappush(array, (np.random.randint(capacity),))
            size += 1
            # assert len(array) == size
    output = []
    for count in range(size, 0, -1):
        output.append(njit_heappop(array))
    return output

In [166]:
capacity = 20_000
iterations = capacity * 200
array = List.empty_list(Tuple((int64,)))
size = capacity // 2
output = test_njit_interspersed(capacity, array, size, iterations)
assert sorted(output) == output
print(len(output))

8968


### Testing heapq, original Python


In [100]:
from heapq import *

python_heappush = heappush
python_heappop = heappop
python_heapreplace = heapreplace
python_heappushpop = heappushpop
python_heapify = heapify

##### Heapify test


In [120]:
size = 1_000_000
array = []
for count in range(size):
    array.append((np.random.randint(capacity),))
python_heapify(array)
output = []
for count in range(size, 0, -1):
    output.append(python_heappop(array))
assert sorted(output) == output

In [109]:
@njit
def test_python_heapify(array, size):
    for i in range(size):
        array.append((np.random.randint(capacity),))
    python_heapify(array)
    output = []
    for count in range(size, 0, -1):
        output.append(python_heappop(array))
    return output

In [111]:
size = 1_000_000
array = List.empty_list(Tuple((int64,)))
output = test_python_heapify(array, size)
assert sorted(output) == output

##### Heappush test


In [122]:
size = 1_000_000
array = []
for count in range(size):
    python_heappush(array, (np.random.randint(capacity),))
output = []
for count in range(size, 0, -1):
    output.append(python_heappop(array))
assert sorted(output) == output

In [126]:
@njit
def test_python_heappush(array, size):
    for count in range(size):
        python_heappush(array, (np.random.randint(capacity),))
    output = []
    for count in range(size, 0, -1):
        output.append(njit_heappop(array))
    return output

In [128]:
size = 1_000_000
array = List.empty_list(Tuple((int64,)))
output = test_python_heappush(array, size)
assert sorted(output) == output

##### Interspersed push and pop tests


In [170]:
capacity = 20_000
iterations = capacity * 200
array = []
size = capacity // 2
for count in range(size):
    python_heappush(array, (np.random.randint(capacity),))
for i in range(iterations):
    if np.random.randint(2) == 0 and size != 0 or size == capacity:
        elem = python_heappop(array)
        size -= 1
        assert len(array) == size
        # if size:
        #     assert all(elem <= array[i] for i in range(size))
    else:
        python_heappush(array, (np.random.randint(capacity),))
        size += 1
        assert len(array) == size
output = []
for count in range(size, 0, -1):
    output.append(python_heappop(array))
assert sorted(output) == output
print(len(output))

5280


In [140]:
@njit
def test_python_interspersed(capacity, array, size, iterations):
    for count in range(size):
        python_heappush(array, (np.random.randint(capacity),))
    for i in range(iterations):
        if np.random.randint(2) == 0 and size != 0 or size == capacity:
            elem = python_heappop(array)
            size -= 1
            # assert len(array) == size
            # for i in range(size):
            #     assert elem <= array[i]
        else:
            python_heappush(array, (np.random.randint(capacity),))
            size += 1
            # assert len(array) == size
    output = []
    for count in range(size, 0, -1):
        output.append(python_heappop(array))
    return output

In [167]:
capacity = 20_000
iterations = capacity * 200
array = List.empty_list(Tuple((int64,)))
size = capacity // 2
output = test_python_interspersed(capacity, array, size, iterations)
assert sorted(output) == output
print(len(output))

11492
