In [None]:
import time
import numpy as np

In [241]:
class CircularBuffer:
    '''
    works only if chunks of data aligns into buffer
    (the most right chunk should be able to put without breaks into 2 smallest)
    '''
    def __init__(self, n, dtype=np.uint16):
        self.buffer = np.empty(n, dtype)
        self.cursor = 0 # pointer to most (recently collected buffer index) + 1
    
    def append(self, data):
        data_length = data.shape[0]
        self.buffer[self.cursor : self.cursor + data_length] = data
        self.cursor = (self.cursor + data_length) % self.buffer.shape[0]
    
    def most_recent(self, n):
        if self.cursor - n < 0:
            data = np.empty(n, self.buffer.dtype)
            data[:n - self.cursor] = self.buffer[self.cursor - n:]
            data[n - self.cursor:] = self.buffer[:self.cursor]
        else:
            data = self.buffer[self.cursor - n:self.cursor].copy()
        return data
    
    def __repr__(self):
        return repr(self.buffer)

In [166]:
DTYPE = np.uint16

In [215]:
random_arrays = np.random.randint(0, 100, size=(100000, 3), dtype=DTYPE)
random_arrays[:3]

array([[50, 29, 20],
       [52,  4, 47],
       [11, 41, 83]], dtype=uint16)

In [244]:
buffer_size = 120

assert buffer_size % random_arrays.shape[1] == 0, 'chunks of data not aligns into buffer'

x = CircularBuffer(buffer_size, DTYPE)

circ_buff_start = time.time()
for random_array in random_arrays:
    x.append(random_array)
print(f'CircularBuffer {time.time() - circ_buff_start}')
    
np_roll_start = time.time()
y = np.empty(buffer_size, DTYPE)

for random_array in random_arrays:
    y = np.roll(y, -len(random_array))
print(f'np.roll        {time.time() - np_roll_start}')


mr = 8
print(x.most_recent(mr))
print(y[-mr:])
np.array_equal(x.most_recent(mr), y[-mr:])

CircularBuffer 0.179459810256958
np.roll        2.0980308055877686
[62 18 49 85 63 90 31 94]
[62 18 49 85 63 90 31 94]


True