# ‚≠êÔ∏è Exploration of techniques for maxpooling

In [1]:
import numpy as np

np.random.seed(42)  # keep diffs minimal

A = np.arange(36)
np.random.shuffle(A)
A = A.reshape(6,6)

In [2]:
from numpy.lib.stride_tricks import as_strided

s, k = 2, 2

out_height = (A.shape[0] - k)//s + 1
out_width = (A.shape[1] - k)//s + 1

strided = as_strided(A, shape=(out_height, out_width, k, k), strides=(s*A.strides[0], s*A.strides[1], A.strides[0], A.strides[1]))

In [3]:
strided

array([[[[35, 13],
         [21, 12]],

        [[26, 30],
         [ 8, 17]],

        [[16, 31],
         [ 9, 34]]],


       [[[ 0,  4],
         [11,  1]],

        [[29, 15],
         [24,  2]],

        [[19,  5],
         [33,  3]]],


       [[[32, 23],
         [25,  6]],

        [[27, 10],
         [20,  7]],

        [[22, 18],
         [14, 28]]]])

In [4]:
np.__version__

'1.26.1'

In [5]:
merged_shape = (*strided.shape[:-2], -1)  # Merge the last two dimensions
reshaped_strided = strided.reshape(merged_shape)
max_indices = np.argmax(reshaped_strided, axis=-1)

max_indices

array([[0, 1, 3],
       [2, 0, 2],
       [0, 0, 3]])

In [6]:
# Optionally, unravel the indices
unraveled_indices = np.unravel_index(max_indices, (k, k))

print("Max indices:\n", max_indices)
print("Unraveled indices:\n", unraveled_indices)

Max indices:
 [[0 1 3]
 [2 0 2]
 [0 0 3]]
Unraveled indices:
 (array([[0, 0, 1],
       [1, 0, 1],
       [0, 0, 1]]), array([[0, 1, 1],
       [0, 0, 0],
       [0, 0, 1]]))


In [7]:
# Initialize the shape of the original array and the stride
original_shape = (6, 6)
stride = 2

# Calculate the original indices
original_row_indices = np.arange(0, max_indices.shape[0] * stride, stride)[:, np.newaxis] + unraveled_indices[0]
original_col_indices = np.arange(0, max_indices.shape[1] * stride, stride)[np.newaxis, :] + unraveled_indices[1]

# Pair them together
original_indices = np.stack((original_row_indices, original_col_indices), axis=-1)

In [8]:
original_indices

array([[[0, 0],
        [0, 3],
        [1, 5]],

       [[3, 0],
        [2, 2],
        [3, 4]],

       [[4, 0],
        [4, 2],
        [5, 5]]])

In [9]:
A

array([[35, 13, 26, 30, 16, 31],
       [21, 12,  8, 17,  9, 34],
       [ 0,  4, 29, 15, 19,  5],
       [11,  1, 24,  2, 33,  3],
       [32, 23, 27, 10, 22, 18],
       [25,  6, 20,  7, 14, 28]])

In [10]:
A = np.array([[1,  2,  3,  4],
              [5,  6,  7,  8],
              [9,  10, 11, 12],
              [13, 14, 15, 16]])

A = A.reshape(2, 2, 2, 2)
A

array([[[[ 1,  2],
         [ 3,  4]],

        [[ 5,  6],
         [ 7,  8]]],


       [[[ 9, 10],
         [11, 12]],

        [[13, 14],
         [15, 16]]]])

In [11]:
A = A.swapaxes(-2, -3) ;  A

array([[[[ 1,  2],
         [ 5,  6]],

        [[ 3,  4],
         [ 7,  8]]],


       [[[ 9, 10],
         [13, 14]],

        [[11, 12],
         [15, 16]]]])

In [12]:
A.shape

(2, 2, 2, 2)

In [13]:
np.argmax(A, axis=-1)

array([[[1, 1],
        [1, 1]],

       [[1, 1],
        [1, 1]]])

# üî∏ This works!

In [14]:
# Further revised transform function to output a 2D array where each row is a flattened kxk block
def final_transform(matrix, k):
    """
    Transforms a matrix so that each kxk block is flattened and each such flattened block becomes a row in a 2D array.
    
    Parameters:
        matrix (np.array): The matrix to transform.
        k (int): The size of the square blocks.
    
    Returns:
        np.array: The transformed 2D array where each row is a flattened kxk block.
    """
    # Reshape into blocks of size kxk
    reshaped_matrix = matrix.reshape(matrix.shape[0] // k, k, matrix.shape[1] // k, k)
    
    # Swap the last two axes
    swapped_matrix = reshaped_matrix.swapaxes(1, 2)
    
    # Flatten each kxk block and make each such flattened block a row in the output 2D array
    return swapped_matrix.reshape(-1, k * k)

# Testing the final transform function with arange(16)
test_matrix = np.arange(16).reshape(4, 4)
final_transformed_test_matrix = final_transform(test_matrix, 2)
final_transformed_test_matrix




array([[ 0,  1,  4,  5],
       [ 2,  3,  6,  7],
       [ 8,  9, 12, 13],
       [10, 11, 14, 15]])

In [15]:
# Using the final_transform function in the main code

# Step 1: Define the original 4x4 matrix A
A = np.array([[12, 15, 5, 0],
              [3, 11, 3, 7],
              [9, 3, 5, 2],
              [4, 7, 6, 8]])

# Step 2: Transform A using the final_transform function
transformed_A = final_transform(A, 2)

# Step 3: Find max_indices using np.argmax
max_indices = np.argmax(transformed_A, axis=1)

# Step 4: Create a lookup matrix and transform it
lookup = np.arange(16).reshape(4, 4)
transformed_lookup = final_transform(lookup, 2)

# Step 5: Retrieve original indices using max_indices
final_original_indices = transformed_lookup[np.arange(max_indices.size), max_indices]

final_original_indices


array([ 1,  7,  8, 15])

In [16]:
# Implementing the MaxPool class
class MaxPool:
    def __init__(self, k):
        self.k = k
        self.lookup = None

    def forward(self, A):
        # Determine padding and pad A
        pad_height = (self.k - A.shape[0] % self.k) % self.k
        pad_width = (self.k - A.shape[1] % self.k) % self.k
        padded_A = np.pad(A, ((0, pad_height), (0, pad_width)), mode='constant')
        
        # Create lookup matrix if it's None or the shape doesn't match
        if self.lookup is None or self.lookup.shape != padded_A.shape:
            rows, cols = padded_A.shape
            self.lookup = np.arange(rows * cols).reshape(rows, cols)
        
        # Transform A and lookup matrix using the final_transform function
        transformed_A = final_transform(padded_A, self.k)
        transformed_lookup = final_transform(self.lookup, self.k)
        
        # Find max_indices using np.argmax
        max_indices = np.argmax(transformed_A, axis=1)
        
        # Retrieve original indices using max_indices
        final_original_indices = transformed_lookup[np.arange(max_indices.size), max_indices]
        
        # Retrieve the max values and reshape to a 2D array
        max_values = padded_A.flatten()[final_original_indices]
        out_height = padded_A.shape[0] // self.k
        out_width = padded_A.shape[1] // self.k
        return max_values.reshape(out_height, out_width)

# Testing the MaxPool class
maxpool = MaxPool(2)
A = np.array([[12, 15, 5, 0],
              [3, 11, 3, 7],
              [9, 3, 5, 2],
              [4, 7, 6, 8]])
maxpool_output = maxpool.forward(A)
maxpool_output


array([[15,  7],
       [ 9,  8]])

In [17]:
# Modifying the MaxPool class to use cleaner keys for lookups
class MaxPool:
    def __init__(self, k):
        self.k = k
        self.lookups = {}  # Dictionary to store lookups for different sizes

    def forward(self, A):
        # Determine padding and pad A
        pad_height = (self.k - A.shape[0] % self.k) % self.k
        pad_width = (self.k - A.shape[1] % self.k) % self.k
        padded_A = np.pad(A, ((0, pad_height), (0, pad_width)), mode='constant')
        
        # Create or retrieve lookup matrix using a cleaner key
        key = f"{padded_A.shape[0]}x{padded_A.shape[1]}"
        if key not in self.lookups:
            rows, cols = padded_A.shape
            self.lookups[key] = np.arange(rows * cols).reshape(rows, cols)
        
        lookup = self.lookups[key]
        
        # Transform A and lookup matrix using the final_transform function
        transformed_A = final_transform(padded_A, self.k)
        transformed_lookup = final_transform(lookup, self.k)
        
        # Find max_indices using np.argmax
        max_indices = np.argmax(transformed_A, axis=1)
        
        # Retrieve original indices using max_indices
        final_original_indices = transformed_lookup[np.arange(max_indices.size), max_indices]
        
        # Retrieve the max values and reshape to a 2D array
        max_values = padded_A.flatten()[final_original_indices]
        out_height = padded_A.shape[0] // self.k
        out_width = padded_A.shape[1] // self.k
        return max_values.reshape(out_height, out_width)

# Quick test to ensure functionality remains the same
maxpool = MaxPool(2)
A1 = np.array([[12, 15, 5, 0],
               [3, 11, 3, 7],
               [9, 3, 5, 2],
               [4, 7, 6, 8]])

output1 = maxpool.forward(A1)
output1


array([[15,  7],
       [ 9,  8]])

In [18]:
def test_maxpool_basic():
    # Initialize MaxPool with k=2
    maxpool = MaxPool(2)
    
    # Define a 4x4 matrix
    A = np.array([[12, 15, 5, 0],
                  [3, 11, 3, 7],
                  [9, 3, 5, 2],
                  [4, 7, 6, 8]])
    
    # Expected output based on manual calculation
    expected_output = np.array([[15, 7],
                                [9, 8]])
    
    # Run the forward method
    output = maxpool.forward(A)
    
    # Check if the output matches the expected output
    assert np.array_equal(output, expected_output), f"Expected {expected_output}, got {output}"

# Run the first test
test_maxpool_basic()

In [19]:
from time import time

# Flushing the cache by reinitializing the MaxPool instance
maxpool = MaxPool(2)

# Timing the first run of the test after flushing the cache
start_time = time()
test_maxpool_basic()
first_run_time_flushed = time() - start_time

# Timing the second run of the test after flushing the cache
start_time = time()
test_maxpool_basic()
second_run_time_flushed = time() - start_time

first_run_time_flushed, second_run_time_flushed


(0.00012421607971191406, 0.001071929931640625)

In [20]:
# Implementing the second test: Padding Test

def test_maxpool_padding():
    # Initialize MaxPool with k=2
    maxpool = MaxPool(2)
    
    # Define a 5x5 matrix
    A = np.array([[12, 15, 5, 0, 1],
                  [3, 11, 3, 7, 2],
                  [9, 3, 5, 2, 3],
                  [4, 7, 6, 8, 4],
                  [1, 2, 3, 4, 5]])
    
    # Expected output based on manual calculation
    expected_output = np.array([[15, 7, 2],
                                [9, 8, 4],
                                [2, 4, 5]])
    
    # Run the forward method
    output = maxpool.forward(A)
    
    # Check if the output matches the expected output
    assert np.array_equal(output, expected_output), f"Expected {expected_output}, got {output}"

# Run the second test
test_maxpool_padding()


In [21]:
# Implementing the third test: Different k Test

def test_maxpool_diff_k():
    # Initialize MaxPool with k=3
    maxpool = MaxPool(3)
    
    # Define a 6x6 matrix
    A = np.array([[12, 15, 5, 0, 1, 2],
                  [3, 11, 3, 7, 2, 4],
                  [9, 3, 5, 2, 3, 1],
                  [4, 7, 6, 8, 4, 6],
                  [1, 2, 3, 4, 5, 2],
                  [2, 3, 5, 1, 0, 3]])
    
    # Expected output based on manual calculation
    expected_output = np.array([[15, 7],
                                [7, 8]])
    
    # Run the forward method
    output = maxpool.forward(A)
    
    # Check if the output matches the expected output
    assert np.array_equal(output, expected_output), f"Expected {expected_output}, got {output}"

# Run the third test
test_maxpool_diff_k()


In [22]:
# Adjusting the fourth test: Reuse Lookup Test

def test_maxpool_reuse_lookup():
    # Initialize MaxPool with k=2
    maxpool = MaxPool(2)
    
    # Define a 4x4 matrix for the first run
    A1 = np.array([
        [12, 15, 5, 0],
        [3, 11, 3, 7],
        [9, 3, 5, 2],
        [4, 7, 6, 8]
    ])
    
    # Define a 5x5 matrix for the second run
    A2 = np.array([
        [12, 15, 5, 0, 1],
        [3, 11, 3, 7, 2],
        [9, 3, 5, 2, 3],
        [4, 7, 6, 8, 4],
        [1, 2, 3, 4, 5]
    ])
    
    # Corrected expected output for A1 based on manual calculation
    expected_output1 = np.array([
        [15, 7],
        [9, 8]
    ])
    
    # Corrected expected output for A2 based on manual calculation
    expected_output2 = np.array([
        [15, 7, 2],
        [9, 8, 4],
        [2, 4, 5]
    ])
    
    # Run the forward method for A1 and check output
    output1 = maxpool.forward(A1)
    assert np.array_equal(output1, expected_output1), f"Expected {expected_output1}, got {output1}"
    
    # Run the forward method for A2 and check output
    output2 = maxpool.forward(A2)
    assert np.array_equal(output2, expected_output2), f"Expected {expected_output2}, got {output2}"
    
    # Run the forward method for A1 again and check output
    output1_again = maxpool.forward(A1)
    assert np.array_equal(output1_again, expected_output1), f"Expected {expected_output1}, got {output1_again}"

# Run the fourth test
test_maxpool_reuse_lookup()


# üî∏ Numba (also works)

In [23]:
! pip install -q numba

In [24]:
from numba import jit

# Using Numba to speed up the for-loop implementation
@jit(nopython=True)
def maxpool_numba(A, k):
    # Determine the output size
    noutY, noutX = A.shape[0] // k, A.shape[1] // k

    # Initialize output arrays
    outIndices = np.zeros((noutY, noutX), dtype=np.int64)
    outValues = np.zeros((noutY, noutX), dtype=A.dtype)
    
    for outY in range(noutY):
        for outX in range(noutX):
            # Extract the relevant slice from A
            square = A[outY * k: (outY + 1) * k, outX * k: (outX + 1) * k]
            
            # Find the index of the maximum value in the square
            indexWinner = np.argmax(square)
            
            # Compute the original index in A for the maximum value
            originalIndex = (outY * k + indexWinner // k) * A.shape[1] + (outX * k + indexWinner % k)
            
            # Store the index and value
            outIndices[outY, outX] = originalIndex
            outValues[outY, outX] = square.flatten()[indexWinner]
    
    return outIndices, outValues

# Test the Numba-based maxpool
A = np.array([
    [12, 15, 5, 0],
    [3, 11, 3, 7],
    [9, 3, 5, 2],
    [4, 7, 6, 8]
])

# Using k=2
k = 2

outIndices, outValues = maxpool_numba(A, k)
outIndices, outValues


(array([[ 1,  7],
        [ 8, 15]]),
 array([[15,  7],
        [ 9,  8]]))

In [25]:
%timeit test_maxpool_basic()

18.8 ¬µs ¬± 176 ns per loop (mean ¬± std. dev. of 7 runs, 100,000 loops each)


In [26]:
%timeit maxpool_numba(A, k)

920 ns ¬± 7.65 ns per loop (mean ¬± std. dev. of 7 runs, 1,000,000 loops each)


In [27]:
from numba import jit
import numpy as np

@jit(forceobj=True)
def matmul_numba(a, b):
    return np.matmul(a, b)

# Define two large NumPy arrays
a = np.random.rand(10000, 10000)
b = np.random.rand(10000, 10000)

# Call the JIT-compiled matmul function
result = matmul_numba(a, b)

# Optionally, print a small part of the result to verify
print(result[:2, :2])

[[2496.71773003 2488.86011419]
 [2485.16816979 2468.95908035]]


In [28]:
# Define two large NumPy arrays
a = np.random.rand(10000, 10000)
b = np.random.rand(10000, 10000)

# Call the JIT-compiled matmul function
result = matmul_numba(a, b)

# Optionally, print a small part of the result to verify
print(result[:2, :2])

[[2527.00446705 2497.52093011]
 [2517.76276695 2489.89448889]]


# üî∏ new approach

In [29]:
N, k = 6, 3  # input size, kernel size
I = np.arange(N*N, dtype=np.int32).reshape(N, N)
I

array([[ 0,  1,  2,  3,  4,  5],
       [ 6,  7,  8,  9, 10, 11],
       [12, 13, 14, 15, 16, 17],
       [18, 19, 20, 21, 22, 23],
       [24, 25, 26, 27, 28, 29],
       [30, 31, 32, 33, 34, 35]], dtype=int32)

In [30]:
# for each output position, grab indices of input-square that created it
s = N-k+1  # output size: 4 postions kernel can slide in x and y direction
lookup = np.zeros((s, s, k*k), dtype=np.int32)
for r in range(s):
    for c in range(s):
        # each position in output needs to be a list of the indices of the input for this kernel-position
        lookup[r,c] = I[r:r+k, c:c+k].flatten()
lookup.shape, lookup[0,1]

((4, 4, 9), array([ 1,  2,  3,  7,  8,  9, 13, 14, 15], dtype=int32))

In [31]:
# from lovely_numpy import Lo
A = np.random.randint(-16, 16, size=(6, 6))  # inputs
# W = np.random.randn(*A.shape)  # weights
# A.flatten()[O]#.reshape(A.shape)
# Lo(A.flatten()[O])
np.set_printoptions(precision=3)

A

array([[-15,   3,  -5,  -4, -12,   0],
       [ 13,   3,  -5,  -1, -12,   7],
       [ 14,  14, -13,   8, -16,   8],
       [ -9, -13,  -7,  -3, -16,   5],
       [-16,   0,  -6,  10,   0,  -4],
       [ 14,   6, -12,  -5,  -8,  15]])

In [32]:
rel_indices = A.flatten()[lookup].argmax(axis=-1)  # winning index within each kernel-location
rel_indices.shape, rel_indices

((4, 4),
 array([[6, 6, 7, 6],
        [3, 3, 4, 3],
        [0, 0, 7, 6],
        [6, 5, 4, 8]]))

In [33]:
i, j = np.ogrid[:s,:s]
winner_indices = lookup[i, j, rel_indices]  # get these indices w.r.t. original shape
i.shape, j.shape, winner_indices

((4, 1),
 (1, 4),
 array([[12, 13, 15, 15],
        [12, 13, 15, 15],
        [12, 13, 27, 27],
        [30, 27, 27, 35]], dtype=int32))

In [34]:
# alternative formulation
# u = np.arange(s, dtype=np.int32)[None] ;  lookup[u.T, u, rel_indices]

In [35]:
multiplier = np.bincount(
    winner_indices.flatten(),
    minlength=A.size
).reshape(A.shape)
multiplier

array([[0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0],
       [3, 3, 0, 4, 0, 0],
       [0, 0, 0, 0, 0, 0],
       [0, 0, 0, 4, 0, 0],
       [1, 0, 0, 0, 0, 1]])

# üîπ Test the index-matrix to freq-matrix conversion

test numpy vs numba impls

In [36]:
import numpy as np

# Input 3x3 matrix with values between 0 and 8
input_matrix = np.array([[5, 4, 4],
                         [4, 5, 0],
                         [1, 5, 6]])

# Calculate the frequency of each number in the input_matrix
frequency_counts = np.bincount(input_matrix.flatten(), minlength=9)

# Reshape the frequency counts to a 3x3 output matrix
frequency_output_matrix = frequency_counts.reshape((3, 3))
frequency_output_matrix

array([[1, 1, 0],
       [0, 3, 3],
       [1, 0, 0]])

In [1]:
if 0:  # crashes kernel sometimes, so disabled
    import numpy as np
    from numba import njit
    import timeit

    # Numba-accelerated function
    @njit
    def count_with_numba(input_matrix):
        B = np.zeros_like(input_matrix)
        for elt in input_matrix.flatten():
            B[elt] += 1
        return B

    # NumPy function
    def count_with_numpy(input_matrix):
        return np.bincount(input_matrix.flatten(), minlength=input_matrix.size).reshape(input_matrix.shape)

    # Input matrix generation
    k = 100  # for a 100x100 matrix
    input_matrix = np.random.randint(0, k*k, size=(k, k))

    # Warm up Numba (trigger compilation)
    count_with_numba(input_matrix)

    # Time profiling
    numpy_time = timeit.timeit('count_with_numpy(input_matrix)', globals=globals(), number=10)
    numba_time = timeit.timeit('count_with_numba(input_matrix)', globals=globals(), number=10)

    print(f"NumPy Time: {numpy_time}")
    print(f"Numba Time: {numba_time}")
