diff --git a/sdk/internal/state_locker.go b/sdk/internal/state_locker.go new file mode 100644 index 00000000000..253377cc3c9 --- /dev/null +++ b/sdk/internal/state_locker.go @@ -0,0 +1,110 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "runtime" + "sync" + "sync/atomic" +) + +// StateLocker implements a two state lock algorithm that enabled lock free operations inside a state +// and a global lock for switching between states. At every time, only one state is active and one cold state. +// States are represented by int numbers 0 and 1. +// +// This was inspired by the algorithm used on the prometheus client library that can be found at: +// https://github.com/prometheus/client_golang/blob/e7776d2c54305c1b62fdb113b5a7e9b944c5c27e/prometheus/histogram.go#L227 +// +// To execute operations within the same state, call `Start()` before the operation and call `End(idx)` +// to end this operation. The `idx` argument of `End()` is the index of the active state when the operation +// started and it is returned by the `Start()` method. It is recommended to defer the call to `End(idx)`. +// +// One can change the active state by calling `SwapActiveState(fn)`. `fn` is a function that will be executed *before* +// switching the active state. Operations such as preparing the new state shall be called by this function. This will +// wait in-flight operations to end. +// +// Example workflow: +// 1. State 0 is active. +// 1.1 Operations to the active state can happen with `Start()` and `End(idx)` methods. +// 2. Call to `SwitchState(fn)` +// 2.1 run `fn` function to prepare the new state +// 2.2 make state 1 active +// 2.3 wait in-flight operations of the state 0 to end. +// 3. State 1 is now active and every new operation are executed in it. +// +// `SwitchState(fn)` are synchronized with a mutex that can be access with the `Lock()` and `Unlock()` methods. +// Access to the cold state must also be synchronized to ensure the cold state is not in the middle of state switch +// since that could represent an invalid state. +// +type StateLocker struct { + countsAndActiveIdx uint64 + finishedOperations [2]uint64 + + sync.Mutex +} + +// Start an operation that will happen on a state. The current active state is returned. +// A call to `End(idx int)` must happens for every `Start()` call. +func (c *StateLocker) Start() int { + n := atomic.AddUint64(&c.countsAndActiveIdx, 1) + return int(n >> 63) +} + +// End an operation that happened to the idx state. +func (c *StateLocker) End(idx int) { + atomic.AddUint64(&c.finishedOperations[idx], 1) +} + +// ColdIdx returns the index of the cold state. +func (c *StateLocker) ColdIdx() int { + return int((^c.countsAndActiveIdx) >> 63) +} + +// SwapActiveState swaps the cold and active states. +// +// This will wait all for in-flight operations that are happening to the current +// active state to end, this ensure that all access to this state will be consistent. +// +// This is synchronized by a mutex. +func (c *StateLocker) SwapActiveState(beforeFn func()) { + c.Lock() + defer c.Unlock() + + if beforeFn != nil { + // prepare the state change + beforeFn() + } + + // Adding 1<<63 switches the active index (from 0 to 1 or from 1 to 0) + // without touching the count bits. + n := atomic.AddUint64(&c.countsAndActiveIdx, 1<<63) + + // count represents how many operations have started *before* the state change. + count := n & ((1 << 63) - 1) + + activeFinishedOperations := &c.finishedOperations[n>>63] + // coldFinishedOperations are the number of operations that have *ended* on the previous state. + coldFinishedOperations := &c.finishedOperations[(^n)>>63] + + // Await all cold writers to finish writing, when coldFinishedOperations == count, all in-flight operations + // have finished and we can cleanly end the state change. + for count != atomic.LoadUint64(coldFinishedOperations) { + runtime.Gosched() // Let observations get work done. + } + + // Make sure that the new state keeps the same count of *ended* operations. + atomic.AddUint64(activeFinishedOperations, count) + atomic.StoreUint64(coldFinishedOperations, 0) +} diff --git a/sdk/internal/state_locker_test.go b/sdk/internal/state_locker_test.go new file mode 100644 index 00000000000..0cd7cebd8c9 --- /dev/null +++ b/sdk/internal/state_locker_test.go @@ -0,0 +1,88 @@ +package internal + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestInflightOperationMustEndBeforeSwap(t *testing.T) { + var swapped bool + ch := make(chan struct{}) + + l := StateLocker{} + op1 := l.Start() + + go func() { + l.SwapActiveState(func() {}) + swapped = true + ch <- struct{}{} + }() + + require.False(t, swapped, "Swap should wait the end of the in-flight operation.") + + l.End(op1) + + select { + case <-ch: + require.True(t, swapped, "Swap should've been completed. ") + case <-time.After(50 * time.Millisecond): + t.Fatal("Swap was not concluded after 50 milliseconds.") + } +} + +func TestEnsureIndexIsConsistent(t *testing.T) { + l := StateLocker{} + op1 := l.Start() + l.End(op1) + + l.SwapActiveState(func() {}) + + op2 := l.Start() + l.End(op2) + + op3 := l.Start() + l.End(op3) + + l.SwapActiveState(func() {}) + + op4 := l.Start() + l.End(op4) + + require.Equal(t, op1, op4, "two operations separated by two swaps should have the same index.") + require.Equal(t, op2, op3, "two operations with no swap in between should have the same index.") + + require.Equal(t, 0, op1, "first index should be 0") + require.Equal(t, 1, op2, "second index should be 1") +} + +func TestTwoSwapsCanHappenWithoutOperationsInBetween(t *testing.T) { + l := StateLocker{} + + require.Equal(t, 1, l.ColdIdx(), "first cold index should be 1") + l.SwapActiveState(func() {}) + require.Equal(t, 0, l.ColdIdx(), "second cold index should be 0") + l.SwapActiveState(func() {}) + require.Equal(t, 1, l.ColdIdx(), "third cold index should be 1") +} + +func BenchmarkStateLocker_StartEnd(b *testing.B) { + l := StateLocker{} + + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + l.End(l.Start()) + } +} + +func BenchmarkStateLocker_SwapActiveState(b *testing.B) { + + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + l := StateLocker{} + l.SwapActiveState(func() {}) + } +} diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 652d46072d6..e64c4b384f1 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -21,16 +21,25 @@ import ( "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/internal" ) type ( // Aggregator observe events and counts them in pre-determined buckets. // It also calculates the sum and count of all events. Aggregator struct { - // state needs to be aligned for 64-bit atomic operations. - current state - // checkpoint needs to be aligned for 64-bit atomic operations. - checkpoint state + // This aggregator uses the StateLocker that enables a lock-free Update() + // in exchange of a blocking and consistent Checkpoint(). Since Checkpoint() + // is called by the sdk itself and it is not part of a hot path, + // the user is not impacted by these blocking calls. + // + // The algorithm keeps two states. At every instance of time there exist one current state, + // in which new updates are aggregated, and one checkpoint state, that represents the state + // since the last Checkpoint(). These states are swapped when a `Checkpoint()` occur. + + // states needs to be aligned for 64-bit atomic operations. + states [2]state + lock internal.StateLocker boundaries []core.Number kind core.NumberKind } @@ -74,16 +83,18 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { agg := Aggregator{ kind: desc.NumberKind(), boundaries: boundaries, - current: state{ - buckets: aggregator.Buckets{ - Boundaries: boundaries, - Counts: make([]core.Number, len(boundaries)+1), + states: [2]state{ + { + buckets: aggregator.Buckets{ + Boundaries: boundaries, + Counts: make([]core.Number, len(boundaries)+1), + }, }, - }, - checkpoint: state{ - buckets: aggregator.Buckets{ - Boundaries: boundaries, - Counts: make([]core.Number, len(boundaries)+1), + { + buckets: aggregator.Buckets{ + Boundaries: boundaries, + Counts: make([]core.Number, len(boundaries)+1), + }, }, }, } @@ -92,17 +103,23 @@ func New(desc *export.Descriptor, boundaries []core.Number) *Aggregator { // Sum returns the sum of all values in the checkpoint. func (c *Aggregator) Sum() (core.Number, error) { - return c.checkpoint.sum, nil + c.lock.Lock() + defer c.lock.Unlock() + return c.checkpoint().sum, nil } // Count returns the number of values in the checkpoint. func (c *Aggregator) Count() (int64, error) { - return int64(c.checkpoint.count.AsUint64()), nil + c.lock.Lock() + defer c.lock.Unlock() + return int64(c.checkpoint().count), nil } // Histogram returns the count of events in pre-determined buckets. func (c *Aggregator) Histogram() (aggregator.Buckets, error) { - return c.checkpoint.buckets, nil + c.lock.Lock() + defer c.lock.Unlock() + return c.checkpoint().buckets, nil } // Checkpoint saves the current state and resets the current state to @@ -110,55 +127,67 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) { // the independent Sum, Count and Bucket Count are not consistent with each // other. func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { - // N.B. There is no atomic operation that can update all three - // values at once without a memory allocation. - // - // This aggregator is intended to trade this correctness for - // speed. - // - // Therefore, atomically swap fields independently, knowing - // that individually the three parts of this aggregation could - // be spread across multiple collections in rare cases. - - c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0)) - c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0)) - - for i := 0; i < len(c.checkpoint.buckets.Counts); i++ { - c.checkpoint.buckets.Counts[i].SetUint64(c.current.buckets.Counts[i].SwapUint64Atomic(0)) - } + c.lock.SwapActiveState(c.resetCheckpoint) +} + +// checkpoint returns the checkpoint state by inverting the lower bit of generationAndHotIdx. +func (c *Aggregator) checkpoint() *state { + return &c.states[c.lock.ColdIdx()] +} + +func (c *Aggregator) resetCheckpoint() { + checkpoint := c.checkpoint() + + checkpoint.count.SetUint64(0) + checkpoint.sum.SetNumber(core.Number(0)) + checkpoint.buckets.Counts = make([]core.Number, len(checkpoint.buckets.Counts)) } // Update adds the recorded measurement to the current data set. func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { kind := desc.NumberKind() - c.current.count.AddUint64Atomic(1) - c.current.sum.AddNumberAtomic(kind, number) + cIdx := c.lock.Start() + defer c.lock.End(cIdx) + + current := &c.states[cIdx] + current.count.AddUint64Atomic(1) + current.sum.AddNumberAtomic(kind, number) for i, boundary := range c.boundaries { if number.CompareNumber(kind, boundary) < 0 { - c.current.buckets.Counts[i].AddUint64Atomic(1) + current.buckets.Counts[i].AddUint64Atomic(1) return nil } } // Observed event is bigger than all defined boundaries. - c.current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) + current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1) + return nil } -// Merge combines two data sets into one. +// Merge combines two histograms that have the same buckets into a single one. func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentMergeError(c, oa) } - c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) - c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) + // Lock() synchronize Merge() and Checkpoint() to make sure all operations of + // Merge() is done to the same state. + c.lock.Lock() + defer c.lock.Unlock() + + current := c.checkpoint() + // We assume that the aggregator being merged is not being updated nor checkpointed or this could be inconsistent. + ocheckpoint := o.checkpoint() + + current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum) + current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count) - for i := 0; i < len(c.current.buckets.Counts); i++ { - c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i]) + for i := 0; i < len(current.buckets.Counts); i++ { + current.buckets.Counts[i].AddNumber(core.Uint64NumberKind, ocheckpoint.buckets.Counts[i]) } return nil } diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index c70e6667be0..f70f9aa649b 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -16,7 +16,6 @@ package histogram import ( "context" - "fmt" "math" "math/rand" "os" @@ -72,12 +71,8 @@ var ( func TestMain(m *testing.M) { fields := []ottest.FieldOffset{ { - Name: "Aggregator.current", - Offset: unsafe.Offsetof(Aggregator{}.current), - }, - { - Name: "Aggregator.checkpoint", - Offset: unsafe.Offsetof(Aggregator{}.checkpoint), + Name: "Aggregator.states", + Offset: unsafe.Offsetof(Aggregator{}.states), }, { Name: "state.buckets", @@ -92,7 +87,6 @@ func TestMain(m *testing.M) { Offset: unsafe.Offsetof(state{}.count), }, } - fmt.Println(fields) if !ottest.Aligned8Byte(fields, os.Stderr) { os.Exit(1) @@ -151,12 +145,12 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { require.Equal(t, all.Count(), count, "Same count -"+policy.name) require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg.checkpoint.buckets.Counts[i].AsUint64() - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts) + bCount := agg.checkpoint().buckets.Counts[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint().buckets.Counts) } } @@ -202,12 +196,12 @@ func TestHistogramMerge(t *testing.T) { require.Equal(t, all.Count(), count, "Same count - absolute") require.Nil(t, err) - require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg1.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg1.checkpoint.buckets.Counts[i].AsUint64() - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts) + bCount := agg1.checkpoint().buckets.Counts[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint().buckets.Counts) } }) } @@ -229,8 +223,8 @@ func TestHistogramNotSet(t *testing.T) { require.Equal(t, int64(0), count, "Empty checkpoint count = 0") require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") - for i, bCount := range agg.checkpoint.buckets.Counts { + require.Equal(t, len(agg.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + for i, bCount := range agg.checkpoint().buckets.Counts { require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) } }) diff --git a/sdk/metric/histogram_stress_test.go b/sdk/metric/histogram_stress_test.go new file mode 100644 index 00000000000..d9bf979ea4b --- /dev/null +++ b/sdk/metric/histogram_stress_test.go @@ -0,0 +1,60 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This test is too large for the race detector. This SDK uses no locks +// that the race detector would help with, anyway. +// +build !race + +package metric_test + +import ( + "context" + "math/rand" + "testing" + "time" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" +) + +func TestStressInt64Histogram(t *testing.T) { + desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind, false) + h := histogram.New(desc, []core.Number{core.NewInt64Number(25), core.NewInt64Number(50), core.NewInt64Number(75)}) + + go func() { + rnd := rand.New(rand.NewSource(time.Now().Unix())) + for { + _ = h.Update(context.Background(), core.NewInt64Number(rnd.Int63()), desc) + } + }() + + startTime := time.Now() + for time.Since(startTime) < time.Second { + h.Checkpoint(context.Background(), desc) + + b, _ := h.Histogram() + c, _ := h.Count() + + var realCount int64 + for _, c := range b.Counts { + v := c.AsInt64() + realCount += v + } + + if realCount != c { + t.Fail() + } + } +}