Skip to content

Commit

Permalink
satellite/audit: fix reservoir sampling bias
Browse files Browse the repository at this point in the history
While researching logs from a large set of audits, I noticed that nearly
all of them had streamIDs starting with 0 or 1. This seemed very odd,
because streamIDs are supposed to be pretty much entirely random, and
every hex digit from 0-f should have been represented with roughly equal
frequency.

It turned out that our A-Chao implementation of reservoir sampling is
flawed. As far as we can tell, so is the Wikipedia implementation. No
one has yet reviewed the original 1982 paper by Dr. Chao in enough
detail to know where the error originated, but we do know that we have
been auditing segments near the beginning of the segment loop (low
streamIDs) far more often than segments near the end of the segment loop
(high streamIDs).

This change uses an algorithm Wikipedia calls "A-Res" instead, and adds
a test to check for that sort of bias creeping back in somehow. A-Res
will be slightly slower than A-Chao, because of a few extra steps that
need to be done, but it does appear to be selecting items uniformly.

Change-Id: I45eba4c522bafc729cebe2aab6f3fe65cd6336be
  • Loading branch information
thepaul committed Dec 10, 2022
1 parent b562cbf commit 231c783
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
34 changes: 20 additions & 14 deletions satellite/audit/reservoir.go
Expand Up @@ -4,6 +4,7 @@
package audit

import (
"math"
"math/rand"
"time"

Expand All @@ -17,9 +18,9 @@ const maxReservoirSize = 3
// Reservoir holds a certain number of segments to reflect a random sample.
type Reservoir struct {
Segments [maxReservoirSize]segmentloop.Segment
Keys [maxReservoirSize]float64
size int8
index int64
wSum int64
}

// NewReservoir instantiates a Reservoir.
Expand All @@ -35,23 +36,28 @@ func NewReservoir(size int) *Reservoir {
}
}

// Sample makes sure that for every segment in metainfo from index i=size..n-1,
// compute the relative weight based on segment size, and pick a random floating
// point number r = rand(0..1), and if r < the relative weight of the segment,
// select uniformly a random segment reservoir.Segments[rand(0..i)] to replace with
// segment. See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao
// for the algorithm used.
// Sample tries to ensure that each segment passed in has a chance (proportional
// to its size) to be in the reservoir when sampling is complete.
//
// The tricky part is that we do not know ahead of time how many segments will
// be passed in. The way this is accomplished is known as _Reservoir Sampling_.
// The specific algorithm we are using here is called A-Res on the Wikipedia
// article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res
func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment) {
k := -math.Log(r.Float64()) / float64(segment.EncryptedSize)
if reservoir.index < int64(reservoir.size) {
reservoir.Segments[reservoir.index] = *segment
reservoir.wSum += int64(segment.EncryptedSize)
reservoir.Keys[reservoir.index] = k
} else {
reservoir.wSum += int64(segment.EncryptedSize)
p := float64(segment.EncryptedSize) / float64(reservoir.wSum)
random := r.Float64()
if random < p {
index := r.Int31n(int32(reservoir.size))
reservoir.Segments[index] = *segment
max := 0
for i := 1; i < int(reservoir.size); i++ {
if reservoir.Keys[i] > reservoir.Keys[max] {
max = i
}
}
if k < reservoir.Keys[max] {
reservoir.Segments[max] = *segment
reservoir.Keys[max] = k
}
}
reservoir.index++
Expand Down
59 changes: 58 additions & 1 deletion satellite/audit/reservoir_test.go
Expand Up @@ -4,10 +4,13 @@
package audit

import (
"encoding/binary"
"math/rand"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"storj.io/common/testrand"
Expand All @@ -30,7 +33,7 @@ func TestReservoir(t *testing.T) {
require.Equal(t, r.Segments[:], []segmentloop.Segment{*seg(1), *seg(2), *seg(3)})
}

func TestReservoirBias(t *testing.T) {
func TestReservoirWeights(t *testing.T) {
var weight10StreamID = testrand.UUID()
var weight5StreamID = testrand.UUID()
var weight2StreamID = testrand.UUID()
Expand Down Expand Up @@ -92,3 +95,57 @@ func TestReservoirBias(t *testing.T) {
require.Greater(t, streamIDCountsMap[weight5StreamID], streamIDCountsMap[weight2StreamID])
require.Greater(t, streamIDCountsMap[weight2StreamID], streamIDCountsMap[weight1StreamID])
}

// Sample many segments, with equal weight, uniformly distributed, and in order,
// through the reservoir. Expect that elements show up in the result set with
// equal chance, whether they were inserted near the beginning of the list or
// near the end.
func TestReservoirBias(t *testing.T) {
const (
reservoirSize = 3
useBits = 14
numSegments = 1 << useBits
weight = 100000 // any number; same for all segments
numRounds = 1000
)

rng := rand.New(rand.NewSource(time.Now().UnixNano()))
numsSelected := make([]uint64, numRounds*reservoirSize)

for r := 0; r < numRounds; r++ {
res := NewReservoir(reservoirSize)
for n := 0; n < numSegments; n++ {
seg := segmentloop.Segment{
EncryptedSize: weight,
}
binary.BigEndian.PutUint64(seg.StreamID[0:8], uint64(n)<<(64-useBits))
res.Sample(rng, &seg)
}
for i, seg := range res.Segments {
num := binary.BigEndian.Uint64(seg.StreamID[0:8]) >> (64 - useBits)
numsSelected[r*reservoirSize+i] = num
}
}

sort.Sort(uint64Slice(numsSelected))

// this delta is probably way too generous. but, the A-Chao
// implementation failed the test with this value, so maybe it's fine.
delta := float64(numSegments / 8)
quartile0 := numsSelected[len(numsSelected)*0/4]
assert.InDelta(t, numSegments*0/4, quartile0, delta)
quartile1 := numsSelected[len(numsSelected)*1/4]
assert.InDelta(t, numSegments*1/4, quartile1, delta)
quartile2 := numsSelected[len(numsSelected)*2/4]
assert.InDelta(t, numSegments*2/4, quartile2, delta)
quartile3 := numsSelected[len(numsSelected)*3/4]
assert.InDelta(t, numSegments*3/4, quartile3, delta)
quartile4 := numsSelected[len(numsSelected)-1]
assert.InDelta(t, numSegments*4/4, quartile4, delta)
}

type uint64Slice []uint64

func (us uint64Slice) Len() int { return len(us) }
func (us uint64Slice) Swap(i, j int) { us[i], us[j] = us[j], us[i] }
func (us uint64Slice) Less(i, j int) bool { return us[i] < us[j] }

0 comments on commit 231c783

Please sign in to comment.