In [1]:
import numpy as np
from collections import defaultdict
from RingBuffer import RingBuf, WeightedRingBuf

In [2]:
def make_hist(iterable):
    hist = defaultdict(int)
    for i in iterable:
        hist[i] += 1
    return dict(hist)

### Test plain ring buffer.

Test that the RingBuf overwrites entries; 'd' should replace 'a'. 

Also show that can handle list accessing with wraparound indexing.

In [3]:
rbuf = RingBuf(3)
rbuf.append('a')
rbuf.append('b')
rbuf.append('c')
rbuf.append('d')
res = rbuf[[0, 1, 2, 3]]
print(res)
assert res == ['d', 'b', 'c', 'd'], 'ERROR'

['d', 'b', 'c', 'd']


Try to sample from the buffer. Do this many times to see that each element appears about the same number of times.

In [4]:
eles = list()
for i in range(900):
    eles += rbuf.sample(1)
hist = make_hist(eles)
print(hist)
for k,v in hist.items():
    assert abs(v - 300) < 50, "ERROR: Sampling seems non-uniform."

{'b': 301, 'd': 313, 'c': 286}


### Test weighted ring buffer.

Requires a class that has some writeable property weight.

In [5]:
class W():
    def __init__(self, weight, name):
        self.weight = weight
        self.name = name
    def __str__(self):
        return 'weight=' + str(self.weight) + ' name=' + str(self.name)
a = W(.3, 'a')
b = W(0, 'b')
c = W(.5, 'c')
d = W(3.4, 'd')

Test that the WeightedRingBuf overwrites entries; 'd' should replace 'a'.

Also show that can handle list accessing with wraparound indexing.

In [6]:
wbuf = WeightedRingBuf(3)
wbuf.append(a)
wbuf.append(b)
wbuf.append(c)
wbuf.append(d)
assert wbuf[[0, 1, 2, 3]] == [d, b, c, d], 'ERROR'

Test that sampling is done in proportion to the weights of the elements. Sampling returns a unique set of ids, so we repeatedly sample.

In [7]:
ids = list()
for i in range(1000):
    ids += wbuf.sample(1)
hist = make_hist([w.name for w in wbuf[ids]])
net_weight = 0 + .5 + 3.4  # sum of weights we are leabing in wbuf

assert set(hist.keys()) == {'c', 'd'}, \
    "'a' should be overwritten, 'b' has 0 weight: " + str(hist.keys())
assert abs(hist['c']/1000 - .5/net_weight) < .1, \
    'Improper sampling: ' + str(hist)
assert abs(hist['d']/1000 - 3.4/net_weight) < .1, \
    'Improper sampling: ' + str(hist)
print(hist)

{'d': 884, 'c': 116}


Test Exclusion

In [8]:
ids = list()
for i in range(1000):
    ids += wbuf.sample(1, exclude = {3})
hist = make_hist([w.name for w in wbuf[ids]])

assert set(hist.keys()) == set('c'), 'Exclusion failure: ' + str(hist)
print(hist)

{'c': 1000}


Change the weight of 'b' to be 1.
3 and retest sampling 

In [9]:
wbuf.update_weight(1, 1.3)  # b's weight goes from 0 ==> 1.3
net_weight = 1.3 + .5 + 3.4

ids = list()
for i in range(1000):
    ids += wbuf.sample(1)
hist = make_hist([w.name for w in wbuf[ids]])

assert set(hist.keys()) == {'b', 'c', 'd'}, str(hist.keys())
assert abs(hist['b']/1000 - 1.3/net_weight) < .1, \
    'Improper sampling: ' + str(hist)
assert abs(hist['c']/1000 - .5/net_weight) < .1, \
    'Improper sampling: ' + str(hist)
assert abs(hist['d']/1000 - 3.4/net_weight) < .1, \
    'Improper sampling: ' + str(hist)
print(hist)

{'b': 255, 'd': 640, 'c': 105}
