Skip to content

Commit

Permalink
Make histogram aggregator checkpoint consistent (#438)
Browse files Browse the repository at this point in the history
* change the histogram aggregator to have a consistent but blocking Checkpoint()

* docs

* wrapping docs

* remove currentIdx from the 8bit alignment check

* stress test

* add export and move lockfreewrite algorithm to an external struct.

* move state locker to another package.

* add todos

* minimal tests

* renaming and docs

* change to context.Background()

* add link to algorithm and grammars

Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
  • Loading branch information
paivagustavo and jmacd committed Mar 11, 2020
1 parent ae9033e commit 288821c
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 57 deletions.
110 changes: 110 additions & 0 deletions sdk/internal/state_locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"runtime"
"sync"
"sync/atomic"
)

// StateLocker implements a two state lock algorithm that enabled lock free operations inside a state
// and a global lock for switching between states. At every time, only one state is active and one cold state.
// States are represented by int numbers 0 and 1.
//
// This was inspired by the algorithm used on the prometheus client library that can be found at:
// https://github.com/prometheus/client_golang/blob/e7776d2c54305c1b62fdb113b5a7e9b944c5c27e/prometheus/histogram.go#L227
//
// To execute operations within the same state, call `Start()` before the operation and call `End(idx)`
// to end this operation. The `idx` argument of `End()` is the index of the active state when the operation
// started and it is returned by the `Start()` method. It is recommended to defer the call to `End(idx)`.
//
// One can change the active state by calling `SwapActiveState(fn)`. `fn` is a function that will be executed *before*
// switching the active state. Operations such as preparing the new state shall be called by this function. This will
// wait in-flight operations to end.
//
// Example workflow:
// 1. State 0 is active.
// 1.1 Operations to the active state can happen with `Start()` and `End(idx)` methods.
// 2. Call to `SwitchState(fn)`
// 2.1 run `fn` function to prepare the new state
// 2.2 make state 1 active
// 2.3 wait in-flight operations of the state 0 to end.
// 3. State 1 is now active and every new operation are executed in it.
//
// `SwitchState(fn)` are synchronized with a mutex that can be access with the `Lock()` and `Unlock()` methods.
// Access to the cold state must also be synchronized to ensure the cold state is not in the middle of state switch
// since that could represent an invalid state.
//
type StateLocker struct {
countsAndActiveIdx uint64
finishedOperations [2]uint64

sync.Mutex
}

// Start an operation that will happen on a state. The current active state is returned.
// A call to `End(idx int)` must happens for every `Start()` call.
func (c *StateLocker) Start() int {
n := atomic.AddUint64(&c.countsAndActiveIdx, 1)
return int(n >> 63)
}

// End an operation that happened to the idx state.
func (c *StateLocker) End(idx int) {
atomic.AddUint64(&c.finishedOperations[idx], 1)
}

// ColdIdx returns the index of the cold state.
func (c *StateLocker) ColdIdx() int {
return int((^c.countsAndActiveIdx) >> 63)
}

// SwapActiveState swaps the cold and active states.
//
// This will wait all for in-flight operations that are happening to the current
// active state to end, this ensure that all access to this state will be consistent.
//
// This is synchronized by a mutex.
func (c *StateLocker) SwapActiveState(beforeFn func()) {
c.Lock()
defer c.Unlock()

if beforeFn != nil {
// prepare the state change
beforeFn()
}

// Adding 1<<63 switches the active index (from 0 to 1 or from 1 to 0)
// without touching the count bits.
n := atomic.AddUint64(&c.countsAndActiveIdx, 1<<63)

// count represents how many operations have started *before* the state change.
count := n & ((1 << 63) - 1)

activeFinishedOperations := &c.finishedOperations[n>>63]
// coldFinishedOperations are the number of operations that have *ended* on the previous state.
coldFinishedOperations := &c.finishedOperations[(^n)>>63]

// Await all cold writers to finish writing, when coldFinishedOperations == count, all in-flight operations
// have finished and we can cleanly end the state change.
for count != atomic.LoadUint64(coldFinishedOperations) {
runtime.Gosched() // Let observations get work done.
}

// Make sure that the new state keeps the same count of *ended* operations.
atomic.AddUint64(activeFinishedOperations, count)
atomic.StoreUint64(coldFinishedOperations, 0)
}
88 changes: 88 additions & 0 deletions sdk/internal/state_locker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package internal

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestInflightOperationMustEndBeforeSwap(t *testing.T) {
var swapped bool
ch := make(chan struct{})

l := StateLocker{}
op1 := l.Start()

go func() {
l.SwapActiveState(func() {})
swapped = true
ch <- struct{}{}
}()

require.False(t, swapped, "Swap should wait the end of the in-flight operation.")

l.End(op1)

select {
case <-ch:
require.True(t, swapped, "Swap should've been completed. ")
case <-time.After(50 * time.Millisecond):
t.Fatal("Swap was not concluded after 50 milliseconds.")
}
}

func TestEnsureIndexIsConsistent(t *testing.T) {
l := StateLocker{}
op1 := l.Start()
l.End(op1)

l.SwapActiveState(func() {})

op2 := l.Start()
l.End(op2)

op3 := l.Start()
l.End(op3)

l.SwapActiveState(func() {})

op4 := l.Start()
l.End(op4)

require.Equal(t, op1, op4, "two operations separated by two swaps should have the same index.")
require.Equal(t, op2, op3, "two operations with no swap in between should have the same index.")

require.Equal(t, 0, op1, "first index should be 0")
require.Equal(t, 1, op2, "second index should be 1")
}

func TestTwoSwapsCanHappenWithoutOperationsInBetween(t *testing.T) {
l := StateLocker{}

require.Equal(t, 1, l.ColdIdx(), "first cold index should be 1")
l.SwapActiveState(func() {})
require.Equal(t, 0, l.ColdIdx(), "second cold index should be 0")
l.SwapActiveState(func() {})
require.Equal(t, 1, l.ColdIdx(), "third cold index should be 1")
}

func BenchmarkStateLocker_StartEnd(b *testing.B) {
l := StateLocker{}

b.ReportAllocs()

for i := 0; i < b.N; i++ {
l.End(l.Start())
}
}

func BenchmarkStateLocker_SwapActiveState(b *testing.B) {

b.ReportAllocs()

for i := 0; i < b.N; i++ {
l := StateLocker{}
l.SwapActiveState(func() {})
}
}
111 changes: 70 additions & 41 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ import (
"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/internal"
)

type (
// Aggregator observe events and counts them in pre-determined buckets.
// It also calculates the sum and count of all events.
Aggregator struct {
// state needs to be aligned for 64-bit atomic operations.
current state
// checkpoint needs to be aligned for 64-bit atomic operations.
checkpoint state
// This aggregator uses the StateLocker that enables a lock-free Update()
// in exchange of a blocking and consistent Checkpoint(). Since Checkpoint()
// is called by the sdk itself and it is not part of a hot path,
// the user is not impacted by these blocking calls.
//
// The algorithm keeps two states. At every instance of time there exist one current state,
// in which new updates are aggregated, and one checkpoint state, that represents the state
// since the last Checkpoint(). These states are swapped when a `Checkpoint()` occur.

// states needs to be aligned for 64-bit atomic operations.
states [2]state
lock internal.StateLocker
boundaries []core.Number
kind core.NumberKind
}
Expand Down Expand Up @@ -74,16 +83,18 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator {
agg := Aggregator{
kind: desc.NumberKind(),
boundaries: boundaries,
current: state{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
states: [2]state{
{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
},
},
},
checkpoint: state{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
},
},
},
}
Expand All @@ -92,73 +103,91 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator {

// Sum returns the sum of all values in the checkpoint.
func (c *Aggregator) Sum() (core.Number, error) {
return c.checkpoint.sum, nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().sum, nil
}

// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
return int64(c.checkpoint.count.AsUint64()), nil
c.lock.Lock()
defer c.lock.Unlock()
return int64(c.checkpoint().count), nil
}

// Histogram returns the count of events in pre-determined buckets.
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
return c.checkpoint.buckets, nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().buckets, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
// N.B. There is no atomic operation that can update all three
// values at once without a memory allocation.
//
// This aggregator is intended to trade this correctness for
// speed.
//
// Therefore, atomically swap fields independently, knowing
// that individually the three parts of this aggregation could
// be spread across multiple collections in rare cases.

c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))

for i := 0; i < len(c.checkpoint.buckets.Counts); i++ {
c.checkpoint.buckets.Counts[i].SetUint64(c.current.buckets.Counts[i].SwapUint64Atomic(0))
}
c.lock.SwapActiveState(c.resetCheckpoint)
}

// checkpoint returns the checkpoint state by inverting the lower bit of generationAndHotIdx.
func (c *Aggregator) checkpoint() *state {
return &c.states[c.lock.ColdIdx()]
}

func (c *Aggregator) resetCheckpoint() {
checkpoint := c.checkpoint()

checkpoint.count.SetUint64(0)
checkpoint.sum.SetNumber(core.Number(0))
checkpoint.buckets.Counts = make([]core.Number, len(checkpoint.buckets.Counts))
}

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
kind := desc.NumberKind()

c.current.count.AddUint64Atomic(1)
c.current.sum.AddNumberAtomic(kind, number)
cIdx := c.lock.Start()
defer c.lock.End(cIdx)

current := &c.states[cIdx]
current.count.AddUint64Atomic(1)
current.sum.AddNumberAtomic(kind, number)

for i, boundary := range c.boundaries {
if number.CompareNumber(kind, boundary) < 0 {
c.current.buckets.Counts[i].AddUint64Atomic(1)
current.buckets.Counts[i].AddUint64Atomic(1)
return nil
}
}

// Observed event is bigger than all defined boundaries.
c.current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1)
current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1)

return nil
}

// Merge combines two data sets into one.
// Merge combines two histograms that have the same buckets into a single one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(c, oa)
}

c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
// Lock() synchronize Merge() and Checkpoint() to make sure all operations of
// Merge() is done to the same state.
c.lock.Lock()
defer c.lock.Unlock()

current := c.checkpoint()
// We assume that the aggregator being merged is not being updated nor checkpointed or this could be inconsistent.
ocheckpoint := o.checkpoint()

current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)
current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)

for i := 0; i < len(c.current.buckets.Counts); i++ {
c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i])
for i := 0; i < len(current.buckets.Counts); i++ {
current.buckets.Counts[i].AddNumber(core.Uint64NumberKind, ocheckpoint.buckets.Counts[i])
}
return nil
}
Expand Down
Loading

0 comments on commit 288821c

Please sign in to comment.