The usual OO implementation of a cube with numpy for the data. It only stores the top-left-back most point in a 3D space where the origin is the top-left-back most point, and has another variable for the size of the Cube. The only function that can be applied is `push_up` which will push the cube up (the y-axis component will decrease)

In [1]:
import random

import numpy as np
from typing import Iterable
import time
from tqdm.auto import trange

In [2]:
class Cube:
    top_left_back_point: np.array
    size: int

    def __init__(self, top_left_back_point: tuple[int, int, int], size: int):
        self.top_left_back_point = np.array(top_left_back_point)
        self.size = size

    def push_up(self, amount: int):
        self.top_left_back_point[1] = max(0, self.top_left_back_point[1] - amount)

Then we can instantiate cubes 

In [3]:
def instantiate_cubes(nb=100):
    cubes = []
    for i in range(nb):
        cubes.append(Cube((i, i*2, i*3), 1))
    return cubes

In [4]:
cubes = instantiate_cubes()
print(cubes[43].top_left_back_point)
print(cubes[43].size)
cubes[43].push_up(26)
print(cubes[43].top_left_back_point)

[ 43  86 129]
1
[ 43  60 129]


Implemanting a handler for the data of all the cubes using numpy


In [5]:
class CubeHandler:
    instances: np.array = np.empty(dtype=Cube, shape=20)
    top_left_back_points: np.array = np.empty(dtype=int, shape=(200, 3))
    sizes: np.array = np.empty(dtype=int, shape=200)
    next_index = 0

    @staticmethod
    def get_next_index():
        index = CubeHandler.next_index
        if index >= len(CubeHandler.instances):
            # Extending the arrays
            CubeHandler.top_left_back_points = np.append(CubeHandler.top_left_back_points, np.empty_like(CubeHandler.top_left_back_points), axis=0)
            CubeHandler.sizes = np.append(CubeHandler.sizes, np.empty_like(CubeHandler.sizes))
            CubeHandler.instances = np.append(CubeHandler.instances, np.empty_like(CubeHandler.instances))
        CubeHandler.next_index += 1
        return index

Then, we can reimplement the Cube without its main attributes

In [6]:
StandaloneCube = Cube
class Cube:
    def __init__(self, top_left_back_point: tuple[int, int, int], size: int):
        self.__index = CubeHandler.get_next_index()
        CubeHandler.top_left_back_points[self.__index] = top_left_back_point
        CubeHandler.sizes[self.__index] = size
        CubeHandler.instances[self.__index] = self

    def push_up(self, amount: int):
        if self is None:
            self = CubeHandler.instances
        if isinstance(self, Cube):
            self = [self]

        for cube in self:
            if cube is None:
                continue
            cube.top_left_back_point[1] = max(0, cube.top_left_back_point[1] - amount)

    @property
    def top_left_back_point(self):
        return CubeHandler.top_left_back_points[self.__index]

    @top_left_back_point.setter
    def top_left_back_point(self, value: tuple[int, int, int]):
        CubeHandler.top_left_back_points[self.__index] = value

    @property
    def size(self):
        return CubeHandler.sizes[self.__index]

    @size.setter
    def size(self, value: int):
        CubeHandler.sizes[self.__index] = value


In [7]:
cubes = instantiate_cubes()
print(cubes[43].top_left_back_point)
print(cubes[43].size)
cubes[43].push_up(26)
print(cubes[43].top_left_back_point)

[ 43  86 129]
1
[ 43  60 129]


The interface is the same, but now comes with an advantage of being able to apply functions on the data more efficiently

In [8]:
cubes = instantiate_cubes()
print(list(x.top_left_back_point[1] for x in cubes[10:20]))
Cube.push_up(cubes, 5)
print(list(x.top_left_back_point[1] for x in cubes[10:20]))

[20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
[15, 17, 19, 21, 23, 25, 27, 29, 31, 33]


In [9]:
print(len(CubeHandler.instances))
CubeHandler.get_next_index()

320


200

But we can also use a fast implementation of the same function that would be much slower in OO

In [10]:
nb_cubes = 1000
nb_iter = 100

In [11]:
all_cubes = [[Cube((j, j*2, j*3), 100) for j in range(nb_cubes)] for i in range(nb_iter)]  # Generate before starting the timer
start = time.perf_counter()
for i in trange(nb_iter):
    cubes = all_cubes[i]
    Cube.push_up(cubes, 5)
print((time.perf_counter() - start)/10)

  0%|          | 0/100 [00:00<?, ?it/s]

0.018719170000000007


In [12]:
all_cubes = [[Cube((j, j*2, j*3), 100) for j in range(nb_cubes)] for i in range(nb_iter)]  # Generate before starting the timer
start = time.perf_counter()
for i in trange(nb_iter):
    cubes = all_cubes[i]
    for cube in cubes:
        cube.push_up(5)
print((time.perf_counter() - start)/10)

  0%|          | 0/100 [00:00<?, ?it/s]

0.016698720000000035


There is a small overhead when using this paradigm, but the advantage is that we can take advantages of the fact that in some context, we will have to apply the same mathematical transformation to a number of adjacent indexes and this can be optimized, whereas in the case of 1 function call = 1 modification, it would be more complex to apply some optimizations

In [13]:
import threading

def parallel_push_up(self, amount):
    for cube in self:
        if cube is None:
            continue
        cube.top_left_back_point[1] = max(0, cube.top_left_back_point[1] - amount)
    return self

def push_up_multi(self, amount):
    if self is None:
        self = CubeHandler.instances
    if isinstance(self, Cube):
        self = [self]
    threads = []
    for i in range(4):
        start = i*len(self)//4
        stop = (i+1)*len(self)//4
        threads.append(threading.Thread(target=parallel_push_up, args=(self[start:stop], amount)))
        threads[-1].start()
    for t in threads:
        t.join()


Using the threading library we can take advantage of the batch processing by splitting the computation on multiple thread. This is a nutshell example, and the real power would be to offset the computation to multiple computers.
We can then replace the push_up method with this one and try again

In [14]:
ParallelCube = Cube
ParallelCube.push_up = push_up_multi

all_cubes = [[Cube((j, j*2, j*3), 100) for j in range(nb_cubes)] for i in range(nb_iter)]  # Generate before starting the timer
start = time.perf_counter()
for i in trange(nb_iter):
    cubes = all_cubes[i]
    Cube.push_up(cubes, 5)
print((time.perf_counter() - start)/10)

  0%|          | 0/100 [00:00<?, ?it/s]

0.02188226000000002


This, once again, adds a small overhead to the previous implementation, but the execution speed seems faster with those examples

However, we did not take into consideration the different size of the data, as well as the state of CubeHandler that is never truly reset which can lower the performance. Therefore, we will now compare on truly independents runs the different methods. Let's rename them first

In [15]:
OCube = StandaloneCube  # Pure Object-Oriented Cube
DCube = Cube  # Delegation Cube
PCube = ParallelCube  # Parallel Cube with the different push_up function

We'll run each iteration 100 times. During each iteration, we will repeat 10 times the operation with each size of data. The size of data will vary from 10 to 100 000 cubes. We also define a function to reset the state of the CubeHandler class.

In [16]:
data_sizes = [10**i for i in range(7)]
nb_sub_iter = 10  # Iterations with the same input size
nb_iter = 30

seed = "OOClimatology"

def reset_CubeHandler():
    CubeHandler.instances= np.empty(dtype=Cube, shape=20)
    CubeHandler.top_left_back_points = np.empty(dtype=int, shape=(200, 3))
    CubeHandler.sizes= np.empty(dtype=int, shape=200)
    CubeHandler.next_index = 0

Let's first run the process on the default OO Cube

In [17]:
times = {d_s: 0 for d_s in data_sizes}
for d_s_i in trange(len(data_sizes)):
    start = time.perf_counter()
    data_size = data_sizes[d_s_i]
    for itr in range(nb_iter):
        start = time.perf_counter()
        for sub_itr in range(nb_sub_iter):
            cubes = [OCube(tuple(random.randint(0, 1000) for _ in range(3)), random.randint(1, 150)) for _ in range(data_size)]
            for cube in cubes:
                cube.push_up(random.randint(1, 150))
    times[data_size] = time.perf_counter() - start
for k, v in times.items():
    print(f"Input size {k}: {v}")

  0%|          | 0/7 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
times = {d_s: 0 for d_s in data_sizes}
for d_s_i in trange(len(data_sizes)):
    start = time.perf_counter()
    data_size = data_sizes[d_s_i]
    for itr in range(nb_iter):
        start = time.perf_counter()
        for sub_itr in range(nb_sub_iter):
            cubes = [OCube(tuple(random.randint(0, 1000) for _ in range(3)), random.randint(1, 150)) for _ in range(data_size)]
            DCube.push_up(cubes, random.randint(1, 150))
    times[data_size] = time.perf_counter() - start
for k, v in times.items():
    print(f"Input size {k}: {v}")

In [None]:
times = {d_s: 0 for d_s in data_sizes}
reset_CubeHandler()
for d_s_i in trange(len(data_sizes)):
    start = time.perf_counter()
    data_size = data_sizes[d_s_i]
    for itr in range(nb_iter):
        start = time.perf_counter()
        for sub_itr in range(nb_sub_iter):
            cubes = [OCube(tuple(random.randint(0, 1000) for _ in range(3)), random.randint(1, 150)) for _ in range(data_size)]
            PCube.push_up(cubes, random.randint(1, 150))
    times[data_size] = time.perf_counter() - start
for k, v in times.items():
    print(f"Input size {k}: {v}")

  0%|          | 0/7 [00:00<?, ?it/s]