From de481400205d55ff1b5a91f2a281dd3f5f59ca4a Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Sat, 16 Mar 2019 19:56:33 -0400 Subject: [PATCH] Address feedback --- storage/flush_mgr.go | 15 ++++++++++++--- values/decoding/bit_stream_int_iterator.go | 4 ++-- values/decoding/fs_bool_values.go | 2 +- values/decoding/fs_bytes_values.go | 2 +- values/decoding/fs_double_values.go | 2 +- values/decoding/fs_int_values.go | 2 +- values/decoding/fs_time_values.go | 2 +- 7 files changed, 19 insertions(+), 10 deletions(-) diff --git a/storage/flush_mgr.go b/storage/flush_mgr.go index 94d6977..486b1f9 100644 --- a/storage/flush_mgr.go +++ b/storage/flush_mgr.go @@ -4,7 +4,6 @@ import ( "errors" "sync" - "github.com/uber-go/tally" indexfield "github.com/xichen2020/eventdb/index/field" "github.com/xichen2020/eventdb/index/segment" "github.com/xichen2020/eventdb/persist" @@ -13,6 +12,7 @@ import ( xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/instrument" xlog "github.com/m3db/m3x/log" + "github.com/uber-go/tally" ) type segmentPayload struct { @@ -48,12 +48,19 @@ var ( ) type flushManagerMetrics struct { - flush instrument.MethodMetrics + flush instrument.MethodMetrics + enqueueSuccess tally.Counter + enqueueFullQueueErrors tally.Counter } func newFlushManagerMetrics(scope tally.Scope) flushManagerMetrics { + enqueueScope := scope.Tagged(map[string]string{"action": "enqueue"}) return flushManagerMetrics{ - flush: instrument.NewMethodMetrics(scope, "flush", 1.0), + flush: instrument.NewMethodMetrics(scope, "flush", 1.0), + enqueueSuccess: enqueueScope.Counter("success"), + enqueueFullQueueErrors: enqueueScope.Tagged(map[string]string{ + "reason": "full-queue", + }).Counter("errors"), } } @@ -112,7 +119,9 @@ func (m *flushManager) Enqueue(p *segmentPayload) error { } select { case m.unflushed <- p: + m.metrics.enqueueSuccess.Inc(1) default: + m.metrics.enqueueFullQueueErrors.Inc(1) return errFlushManagerFlushQueueFull } return nil diff --git a/values/decoding/bit_stream_int_iterator.go b/values/decoding/bit_stream_int_iterator.go index 32d78bb..ab0840a 100644 --- a/values/decoding/bit_stream_int_iterator.go +++ b/values/decoding/bit_stream_int_iterator.go @@ -22,8 +22,8 @@ type bitStreamIntIterator struct { func newBitStreamIntIterator( reader xio.Reader, - bitsPerEncodedValue int, // Number of bits per encoded value - numEncodedValues int, // Number of encoded values + bitsPerEncodedValue int, + numEncodedValues int, ) *bitStreamIntIterator { return &bitStreamIntIterator{ bitReader: bitstream.NewReader(reader), diff --git a/values/decoding/fs_bool_values.go b/values/decoding/fs_bool_values.go index 720f61e..b891ef3 100644 --- a/values/decoding/fs_bool_values.go +++ b/values/decoding/fs_bool_values.go @@ -49,7 +49,7 @@ func (v *fsBasedBoolValues) Filter( return nil, errNilFilterValue } if filterValue.Type != field.BoolType { - return nil, fmt.Errorf("bool values filter expect bool filter value type but got %v filter value type", filterValue.Type) + return nil, fmt.Errorf("bool values filter expects bool filter value type but got %v filter value type", filterValue.Type) } if !op.BoolIsInRange(int(v.metaProto.NumTrues), int(v.metaProto.NumFalses), filterValue.BoolVal) { return impl.NewEmptyPositionIterator(), nil diff --git a/values/decoding/fs_bytes_values.go b/values/decoding/fs_bytes_values.go index 33d450e..16cfbfb 100644 --- a/values/decoding/fs_bytes_values.go +++ b/values/decoding/fs_bytes_values.go @@ -74,7 +74,7 @@ func (v *fsBasedBytesValues) Filter( return nil, errNilFilterValue } if filterValue.Type != field.BytesType { - return nil, fmt.Errorf("bytes values filter expect bytes filter value type but got %v filter value type", filterValue.Type) + return nil, fmt.Errorf("bytes values filter expects bytes filter value type but got %v filter value type", filterValue.Type) } if !op.BytesMaybeInRange(v.metaProto.MinValue, v.metaProto.MaxValue, filterValue.BytesVal.Bytes()) { return impl.NewEmptyPositionIterator(), nil diff --git a/values/decoding/fs_double_values.go b/values/decoding/fs_double_values.go index de076cb..d11104c 100644 --- a/values/decoding/fs_double_values.go +++ b/values/decoding/fs_double_values.go @@ -56,7 +56,7 @@ func (v *fsBasedDoubleValues) Filter( case field.IntType: filterVal = float64(filterValue.IntVal) default: - return nil, fmt.Errorf("double values filter expect double or int filter value type but got %v filter value type", filterValue.Type) + return nil, fmt.Errorf("double values filter expects double or int filter value type but got %v filter value type", filterValue.Type) } if !op.DoubleMaybeInRange(v.metaProto.MinValue, v.metaProto.MaxValue, filterVal) { return impl.NewEmptyPositionIterator(), nil diff --git a/values/decoding/fs_int_values.go b/values/decoding/fs_int_values.go index cc61373..0f59d0b 100644 --- a/values/decoding/fs_int_values.go +++ b/values/decoding/fs_int_values.go @@ -85,7 +85,7 @@ func (v *fsBasedIntValues) Filter( intVal = filterValue.IntVal maybeInRange = op.IntMaybeInRange(int(v.metaProto.MinValue), int(v.metaProto.MaxValue), filterValue.IntVal) default: - return nil, fmt.Errorf("double values filter expect double or int filter value type but got %v filter value type", filterValue.Type) + return nil, fmt.Errorf("int values filter expects double or int filter value type but got %v filter value type", filterValue.Type) } if !maybeInRange { return impl.NewEmptyPositionIterator(), nil diff --git a/values/decoding/fs_time_values.go b/values/decoding/fs_time_values.go index 27027ad..2b14705 100644 --- a/values/decoding/fs_time_values.go +++ b/values/decoding/fs_time_values.go @@ -51,7 +51,7 @@ func (v *fsBasedTimeValues) Filter( return nil, errNilFilterValue } if filterValue.Type != field.TimeType { - return nil, fmt.Errorf("time values filter expect time filter value type but got %v filter value type", filterValue.Type) + return nil, fmt.Errorf("time values filter expects time filter value type but got %v filter value type", filterValue.Type) } if !op.TimeMaybeInRange(v.metaProto.MinValue, v.metaProto.MaxValue, filterValue.TimeNanosVal) { return impl.NewEmptyPositionIterator(), nil