Skip to content

Commit

Permalink
[coordinator] Add write host error and fetch host error sampled loggi…
Browse files Browse the repository at this point in the history
…ng and config (#4173)
  • Loading branch information
robskillington committed Mar 10, 2023
1 parent 6a7a5f2 commit 6d5f3aa
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 72 deletions.
4 changes: 4 additions & 0 deletions site/content/includes/m3dbnode/annotated_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ db:
jitter: <bool>
# Log error sample rate
logErrorSampleRate: <float>
# Log host write error sample rate
logHostWriteErrorSampleRate: <float>
# Log host fetch error sample rate
logHostFetchErrorSampleRate: <float>
# The amount of times a background check fails before a connection is taken out of consideration
backgroundHealthCheckFailLimit: <int>
# The factor of the host connect time when sleeping between a failed health check and the next check
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ func TestConfiguration(t *testing.T) {
forever: null
jitter: true
logErrorSampleRate: 0
logHostWriteErrorSampleRate: 0
logHostFetchErrorSampleRate: 0
backgroundHealthCheckFailLimit: 4
backgroundHealthCheckFailThrottleFactor: 0.5
hashing:
Expand Down
112 changes: 112 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ type Configuration struct {
// LogErrorSampleRate is the log error sample rate.
LogErrorSampleRate sampler.Rate `yaml:"logErrorSampleRate"`

// LogHostWriteErrorSampleRate is the log write error per host sample rate.
LogHostWriteErrorSampleRate sampler.Rate `yaml:"logHostWriteErrorSampleRate"`

// LogHostFetchErrorSampleRate is the log fetch error per host sample rate.
LogHostFetchErrorSampleRate sampler.Rate `yaml:"logHostFetchErrorSampleRate"`

// BackgroundHealthCheckFailLimit is the amount of times a background check
// must fail before a connection is taken out of consideration.
BackgroundHealthCheckFailLimit *int `yaml:"backgroundHealthCheckFailLimit"`
Expand Down Expand Up @@ -330,7 +336,9 @@ func (c Configuration) NewAdminClient(
SetNamespaceInitializer(syncNsInit).
SetAsyncTopologyInitializers(asyncTopoInits).
SetInstrumentOptions(iopts).
SetLogErrorSampleRate(c.LogErrorSampleRate)
SetLogErrorSampleRate(c.LogErrorSampleRate).
SetLogHostWriteErrorSampleRate(c.LogHostWriteErrorSampleRate).
SetLogHostFetchErrorSampleRate(c.LogHostFetchErrorSampleRate)

if params.ClockOptions != nil {
v = v.SetClockOptions(params.ClockOptions)
Expand Down
22 changes: 16 additions & 6 deletions src/dbnode/client/fetch_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/namespace"
Expand All @@ -48,10 +49,8 @@ const (
maxInt = int(maxUint >> 1)
)

var (
errFetchStateStillProcessing = errors.New("[invariant violated] fetch " +
"state is still processing, unable to create response")
)
var errFetchStateStillProcessing = errors.New("[invariant violated] fetch " +
"state is still processing, unable to create response")

type fetchState struct {
sync.Cond
Expand All @@ -71,7 +70,8 @@ type fetchState struct {
// is used for - fetchTagged or Aggregate.
stateType fetchStateType

done bool
done bool
lastResetTime time.Time
}

func newFetchState(pool fetchStatePool) *fetchState {
Expand Down Expand Up @@ -99,6 +99,7 @@ func (f *fetchState) close() {
}
f.err = nil
f.done = false
f.lastResetTime = time.Time{}
f.tagResultAccumulator.Clear()

if f.pool == nil {
Expand All @@ -117,6 +118,7 @@ func (f *fetchState) ResetFetchTagged(
op.incRef() // take a reference to the provided op
f.fetchTaggedOp = op
f.stateType = fetchTaggedFetchState
f.lastResetTime = time.Now()
f.tagResultAccumulator.Reset(startTime, endTime, topoMap, majority, consistencyLevel)
}

Expand All @@ -130,6 +132,7 @@ func (f *fetchState) ResetAggregate(
op.incRef() // take a reference to the provided op
f.aggregateOp = op
f.stateType = aggregateFetchState
f.lastResetTime = time.Now()
f.tagResultAccumulator.Reset(startTime, endTime, topoMap, majority, consistencyLevel)
}

Expand Down Expand Up @@ -157,13 +160,19 @@ func (f *fetchState) completionFn(
}

var (
took time.Duration
done bool
err error
)
if !f.lastResetTime.IsZero() {
took = time.Since(f.lastResetTime)
}
switch r := result.(type) {
case fetchTaggedResultAccumulatorOpts:
f.pool.MaybeLogHostError(maybeHostFetchError{err: resultErr, host: r.host, reqRespTime: took})
done, err = f.tagResultAccumulator.AddFetchTaggedResponse(r, resultErr)
case aggregateResultAccumulatorOpts:
f.pool.MaybeLogHostError(maybeHostFetchError{err: resultErr, host: r.host, reqRespTime: took})
done, err = f.tagResultAccumulator.AddAggregateResponse(r, resultErr)
default:
// should never happen
Expand Down Expand Up @@ -242,7 +251,8 @@ func (f *fetchState) asEncodingSeriesIterators(
}

func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools, limit int) (
AggregatedTagsIterator, FetchResponseMetadata, error) {
AggregatedTagsIterator, FetchResponseMetadata, error,
) {
f.Lock()
defer f.Unlock()

Expand Down
52 changes: 49 additions & 3 deletions src/dbnode/client/fetch_state_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,41 @@

package client

import "github.com/m3db/m3/src/x/pool"
import (
"time"

"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/sampler"

"go.uber.org/zap"
)

type fetchStatePool interface {
Init()
Get() *fetchState
Put(*fetchState)
MaybeLogHostError(hostErr maybeHostFetchError)
}

type fetchStatePoolImpl struct {
pool pool.ObjectPool

instrumentOpts instrument.Options
logger *zap.Logger
logHostErrorSampler *sampler.Sampler
}

func newFetchStatePool(opts pool.ObjectPoolOptions) fetchStatePool {
func newFetchStatePool(
opts pool.ObjectPoolOptions,
logger *zap.Logger,
logHostErrorSampler *sampler.Sampler,
) fetchStatePool {
return &fetchStatePoolImpl{
pool: pool.NewObjectPool(opts),
pool: pool.NewObjectPool(opts),
logger: logger,
logHostErrorSampler: logHostErrorSampler,
}
}

Expand All @@ -51,3 +71,29 @@ func (p *fetchStatePoolImpl) Get() *fetchState {
func (p *fetchStatePoolImpl) Put(f *fetchState) {
p.pool.Put(f)
}

func (p *fetchStatePoolImpl) MaybeLogHostError(hostErr maybeHostFetchError) {
if hostErr.err == nil {
// No error, this is an expected code path when host request doesn't
// encounter an error.
return
}

if !p.logHostErrorSampler.Sample() {
return
}

p.logger.Warn("sampled error fetching from host (may not lead to consistency result error)",
zap.Stringer("host", hostErr.host),
zap.Duration("reqRespTime", hostErr.reqRespTime),
zap.Error(hostErr.err))
}

type maybeHostFetchError struct {
// Note: both these fields should be set always.
host topology.Host
reqRespTime time.Duration

// Error field is optionally set when there is actually an error.
err error
}
5 changes: 4 additions & 1 deletion src/dbnode/client/fetch_state_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"testing"

"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/sampler"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestFetchStatePool(t *testing.T) {
p := newFetchStatePool(pool.NewObjectPoolOptions().SetSize(1))
p := newFetchStatePool(pool.NewObjectPoolOptions().SetSize(1),
zap.NewExample(), sampler.MustNewSampler(1))
p.Init()

s := p.Get()
Expand Down
Loading

0 comments on commit 6d5f3aa

Please sign in to comment.