From c39ea1dfeff6fd4936ec1e2c9bc2c7a1c0a538e6 Mon Sep 17 00:00:00 2001 From: Harkishen Singh Date: Mon, 26 Apr 2021 16:38:16 +0530 Subject: [PATCH] Add ewma package for rate calculation. Signed-off-by: Harkishen Singh --- go.sum | 3 - pkg/ewma/ewma.go | 61 +++++++++++++++ pkg/internal/testhelpers/containers.go | 2 +- pkg/pgmodel/ingestor/dispatcher.go | 19 +++-- pkg/pgmodel/ingestor/ingestor.go | 3 +- pkg/pgmodel/ingestor/watcher.go | 100 ++++++++++++------------ pkg/pgmodel/model/samples.go | 4 +- pkg/pgmodel/querier/series_set_test.go | 8 ++ pkg/tests/upgrade_tests/upgrade_test.go | 4 +- 9 files changed, 136 insertions(+), 68 deletions(-) create mode 100644 pkg/ewma/ewma.go diff --git a/go.sum b/go.sum index 8a27ba9deb..746c50e138 100644 --- a/go.sum +++ b/go.sum @@ -695,7 +695,6 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk= github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= -github.com/jackc/pgconn v1.6.3 h1:4Ks3RKvSvKPolXZsnLQTDAsokDhgID14Cv4ehECmzlY= github.com/jackc/pgconn v1.6.3/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= github.com/jackc/pgconn v1.8.1 h1:ySBX7Q87vOMqKU2bbmKbUvtYhauDFclYbNDYIE1/h6s= github.com/jackc/pgconn v1.8.1/go.mod h1:JV6m6b6jhjdmzchES0drzCcYcAHS1OPD5xu3OZ/lE2g= @@ -714,7 +713,6 @@ github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.0.6 h1:b1105ZGEMFe7aCvrT1Cca3VoVb4ZFMaFJLJcg/3zD+8= github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= @@ -1233,7 +1231,6 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= diff --git a/pkg/ewma/ewma.go b/pkg/ewma/ewma.go new file mode 100644 index 0000000000..dbf4868ee0 --- /dev/null +++ b/pkg/ewma/ewma.go @@ -0,0 +1,61 @@ +// This code has been borrowed from https://github.com/prometheus/prometheus/blob/main/storage/remote/ewma.go +// +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package ewma + +import ( + "sync" + "sync/atomic" + "time" +) + +// Rate tracks an exponentially weighted moving average of a per-second rate. +type Rate struct { + newEvents int64 + + alpha float64 + interval time.Duration + lastRate float64 + init bool + mutex sync.Mutex +} + +// NewEWMARate always allocates a new ewmaRate, as this guarantees the atomically +// accessed int64 will be aligned on ARM. See prometheus#2666. +func NewEWMARate(alpha float64, interval time.Duration) *Rate { + return &Rate{ + alpha: alpha, + interval: interval, + } +} + +// Rate returns the per-second rate. +func (r *Rate) Rate() float64 { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.lastRate +} + +// Tick assumes to be called every r.interval. +func (r *Rate) Tick() { + newEvents := atomic.SwapInt64(&r.newEvents, 0) + instantRate := float64(newEvents) / r.interval.Seconds() + + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.init { + r.lastRate += r.alpha * (instantRate - r.lastRate) + } else if newEvents > 0 { + r.init = true + r.lastRate = instantRate + } +} + +// Incr counts incr events. +func (r *Rate) Incr(incr int64) { + atomic.AddInt64(&r.newEvents, incr) +} diff --git a/pkg/internal/testhelpers/containers.go b/pkg/internal/testhelpers/containers.go index baaec048c8..9ecc3539a8 100644 --- a/pkg/internal/testhelpers/containers.go +++ b/pkg/internal/testhelpers/containers.go @@ -21,7 +21,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" _ "github.com/jackc/pgx/v4/stdlib" "github.com/stretchr/testify/require" - testcontainers "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "github.com/timescale/promscale/pkg/pgmodel/common/schema" ) diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index 0fac118a6f..daffaf4e49 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -196,9 +196,11 @@ func (p *pgxDispatcher) Close() { // Though we may insert data to multiple tables concurrently, if asyncAcks is // unset this function will wait until _all_ the insert attempts have completed. func (p *pgxDispatcher) InsertData(dataTS model.Data) (uint64, error) { - rows := dataTS.Rows - var numRows uint64 - workFinished := &sync.WaitGroup{} + var ( + numRows uint64 + rows = dataTS.Rows + workFinished = &sync.WaitGroup{} + ) workFinished.Add(len(rows)) // we only allocate enough space for a single error message here as we only // report one error back upstream. The inserter should not block on this @@ -211,12 +213,16 @@ func (p *pgxDispatcher) InsertData(dataTS model.Data) (uint64, error) { // the following is usually non-blocking, just a channel insert p.getMetricBatcher(metricName) <- &insertDataRequest{metric: metricName, data: data, finished: workFinished, errChan: errChan} } + reportIncomingBatch(numRows) + reportOutgoing := func() { + reportOutgoingBatch(numRows) + reportBatchProcessingTime(dataTS.ReceivedTime) + } var err error if !p.asyncAcks { workFinished.Wait() - decBatch(numRows) - registerIngestionDuration(dataTS.InTime) + reportOutgoing() select { case err = <-errChan: default: @@ -225,8 +231,7 @@ func (p *pgxDispatcher) InsertData(dataTS model.Data) (uint64, error) { } else { go func() { workFinished.Wait() - decBatch(numRows) - registerIngestionDuration(dataTS.InTime) + reportOutgoing() select { case err = <-errChan: default: diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 33a0a5718b..d6880cf6f6 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -94,8 +94,7 @@ func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, error) { // samples) must no longer be reachable from req. FinishWriteRequest(r) - incBatch(uint64(totalRows)) - rowsInserted, err := ingestor.dispatcher.InsertData(model.Data{Rows: dataSamples, InTime: time.Now()}) + rowsInserted, err := ingestor.dispatcher.InsertData(model.Data{Rows: dataSamples, ReceivedTime: time.Now()}) if err == nil && rowsInserted != totalRows { return rowsInserted, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRows, rowsInserted) } diff --git a/pkg/pgmodel/ingestor/watcher.go b/pkg/pgmodel/ingestor/watcher.go index 82abfc5a3b..f0de953650 100644 --- a/pkg/pgmodel/ingestor/watcher.go +++ b/pkg/pgmodel/ingestor/watcher.go @@ -6,9 +6,11 @@ package ingestor import ( "fmt" + "sync" "sync/atomic" "time" + "github.com/timescale/promscale/pkg/ewma" "github.com/timescale/promscale/pkg/log" ) @@ -16,25 +18,20 @@ const ( // reportThreshold is a value above which the warn would be logged. This value is // the value of the ratio between incoming batches vs outgoing batches. reportRatioThreshold = 3 - refreshIteration = 10 // This means that values of watcher.incomingBatches and watcher.outgoingBatches gets renewed every (refreshIteration * checkThroughputInterval). checkRatioInterval = time.Minute // Duration warnings constants. - reportDurationThreshold = time.Minute - checkIngestionDurationInterval = time.Minute + reportDurationThreshold = time.Minute ) // throughtputWatcher is a light weight samples batch watching type, that serves as a watching // routine and keeps the track of incoming write batches. It keeps a ratio of // incoming samples vs outgoing samples and warn about low throughput to the user // which might be due to external causes like network latency, resources allocated, etc. -// -// We watch the batches than the samples in total, since watching batching is fast -// and does the same purpose of watching samples, but on a higher-level. type throughtputWatcher struct { // Warn based on ratio. - incomingBatches uint64 - outgoingBatches uint64 + incomingBatches *ewma.Rate + outgoingBatches *ewma.Rate // Warn based on ingestion duration. shouldReportLastDuration atomic.Value @@ -43,76 +40,77 @@ type throughtputWatcher struct { stop chan struct{} } -var watcher *throughtputWatcher +var ( + watcher *throughtputWatcher + tWatcher = new(sync.Once) +) func runThroughputWatcher(stop chan struct{}) { + tWatcher.Do(func() { + watcher := newThroughputWatcher(stop) + go watcher.watch() + }) +} + +func newThroughputWatcher(stop chan struct{}) *throughtputWatcher { watcher = &throughtputWatcher{ - incomingBatches: 0, - outgoingBatches: 0, + incomingBatches: ewma.NewEWMARate(1, checkRatioInterval), + outgoingBatches: ewma.NewEWMARate(1, checkRatioInterval), stop: stop, } watcher.shouldReportLastDuration.Store(false) - go watcher.watch() - go watcher.watchTime() + return watcher } -// watch watches the ratio between incomingBatches and outgoingBatches every checkThroughputInterval. +// watchBatches watches the ratio between incomingBatches and outgoingBatches every checkRatioInterval. func (w *throughtputWatcher) watch() { - var ( - itr uint8 - t = time.NewTicker(checkRatioInterval) - ) - for range t.C { + t := time.NewTicker(checkRatioInterval) + defer t.Stop() + for { select { + case <-t.C: case <-w.stop: return - default: } - itr++ - if w.outgoingBatches > 1 { - if ratio := float64(atomic.LoadUint64(&w.incomingBatches)) / float64(atomic.LoadUint64(&w.outgoingBatches)); ratio > reportRatioThreshold { - w.warn(ratio) + w.outgoingBatches.Tick() + w.incomingBatches.Tick() + outgoingRate := w.outgoingBatches.Rate() + incomingRate := w.incomingBatches.Rate() + if outgoingRate > 0 { + r := incomingRate / outgoingRate + if r > reportRatioThreshold { + warnHighRatio(r, incomingRate, outgoingRate) } } - if itr > refreshIteration { - // refresh every 'refreshIteration' of outgoing ratio checks. - atomic.StoreUint64(&watcher.incomingBatches, 0) - atomic.StoreUint64(&watcher.outgoingBatches, 0) + if w.shouldReportLastDuration.Load().(bool) { + warnSlowIngestion(w.lastIngestionDuration.Load().(time.Duration)) + w.shouldReportLastDuration.Store(false) } } } -func (w *throughtputWatcher) warn(ratio float64) { - log.Warn("msg", "[WARNING] Incoming samples rate higher than outgoing samples. This may happen due to poor network latency, "+ - "less resources allocated, or few concurrent writes (shards) from Prometheus. Please tune your resource to improve performance. ", - "Throughput ratio", fmt.Sprintf("%.2f", ratio), "Ratio threshold", reportRatioThreshold) +func warnHighRatio(ratio float64, inRate, outRate float64) { + log.Warn("msg", "[WARNING] Incoming samples rate much higher than the rate of samples being saved. "+ + "This may happen due to poor network latency, not enough resources allocated to Promscale, or some other performance-related reason. "+ + "Please tune your system or reach out to the Promscale team.", + "incoming-rate", inRate, "outgoing-rate", outRate, + "throughput-ratio", fmt.Sprintf("%.2f", ratio), "threshold", reportRatioThreshold) } -func (w *throughtputWatcher) watchTime() { - t := time.NewTicker(checkIngestionDurationInterval) - for range t.C { - select { - case <-w.stop: - return - default: - } - if w.shouldReportLastDuration.Load().(bool) { - log.Warn("msg", "[WARNING] Ingestion is taking too long", "ingest duration", - w.lastIngestionDuration.Load().(time.Duration).String(), "threshold", reportDurationThreshold.String()) - w.shouldReportLastDuration.Store(false) - } - } +func warnSlowIngestion(duration time.Duration) { + log.Warn("msg", "[WARNING] Ingestion is a very long time", "duration", + duration.String(), "threshold", reportDurationThreshold.String()) } -func incBatch(size uint64) { - atomic.AddUint64(&watcher.incomingBatches, size) +func reportIncomingBatch(size uint64) { + watcher.incomingBatches.Incr(int64(size)) } -func decBatch(size uint64) { - atomic.AddUint64(&watcher.outgoingBatches, size) +func reportOutgoingBatch(size uint64) { + watcher.outgoingBatches.Incr(int64(size)) } -func registerIngestionDuration(inTime time.Time) { +func reportBatchProcessingTime(inTime time.Time) { d := time.Since(inTime) if d.Seconds() > reportDurationThreshold.Seconds() { watcher.shouldReportLastDuration.Store(true) diff --git a/pkg/pgmodel/model/samples.go b/pkg/pgmodel/model/samples.go index 55427a2039..2e444da19a 100644 --- a/pkg/pgmodel/model/samples.go +++ b/pkg/pgmodel/model/samples.go @@ -21,8 +21,8 @@ type Samples interface { // Data wraps incoming data with its in-timestamp. It is used to warn if the rate // of incoming samples vs outgoing samples is too low, based on time. type Data struct { - Rows map[string][]Samples - InTime time.Time + Rows map[string][]Samples + ReceivedTime time.Time } type promSample struct { diff --git a/pkg/pgmodel/querier/series_set_test.go b/pkg/pgmodel/querier/series_set_test.go index 8c0e96072e..d69879e279 100644 --- a/pkg/pgmodel/querier/series_set_test.go +++ b/pkg/pgmodel/querier/series_set_test.go @@ -30,26 +30,31 @@ type mockPgxRows struct { err error } +//nolint // Close closes the rows, making the connection ready for use again. It is safe // to call Close after rows is already closed. func (m *mockPgxRows) Close() { m.closeCalled = true } +//nolint // Err returns any error that occurred while reading. func (m *mockPgxRows) Err() error { return nil } +//nolint // CommandTag returns the command tag from this query. It is only available after Rows is closed. func (m *mockPgxRows) CommandTag() pgconn.CommandTag { panic("not implemented") } +//nolint func (m *mockPgxRows) FieldDescriptions() []pgproto3.FieldDescription { panic("not implemented") } +//nolint // Next prepares the next row for reading. It returns true if there is another // row and false if no more rows are available. It automatically closes rows // when all rows are read. @@ -62,6 +67,7 @@ func (m *mockPgxRows) Next() bool { return m.idx < len(m.results) } +//nolint // Scan reads the values from the current row into dest values positionally. // dest can include pointers to core types, values implementing the Scanner // interface, []byte, and nil. []byte will skip the decoding process and directly @@ -95,11 +101,13 @@ func (m *mockPgxRows) Scan(dest ...interface{}) error { return nil } +//nolint // Values returns the decoded row values. func (m *mockPgxRows) Values() ([]interface{}, error) { panic("not implemented") } +//nolint // RawValues returns the unparsed bytes of the row values. The returned [][]byte is only valid until the next Next // call or the Rows is closed. However, the underlying byte data is safe to retain a reference to and mutate. func (m *mockPgxRows) RawValues() [][]byte { diff --git a/pkg/tests/upgrade_tests/upgrade_test.go b/pkg/tests/upgrade_tests/upgrade_test.go index 88611efb2a..36b794ec27 100644 --- a/pkg/tests/upgrade_tests/upgrade_test.go +++ b/pkg/tests/upgrade_tests/upgrade_test.go @@ -499,7 +499,7 @@ func TestExtensionUpgrade(t *testing.T) { var version string ctx := context.Background() buildPromscaleImageFromRepo(t) - db, dbContainer, closer := startDB(t, ctx) + _, dbContainer, closer := startDB(t, ctx) defer closer.Close() defer testhelpers.StopContainer(ctx, dbContainer, false) @@ -509,7 +509,7 @@ func TestExtensionUpgrade(t *testing.T) { extVersion := "2.0.0-rc2" dropAndCreateExt(t, ctx, extVersion) - db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) + db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) if err != nil { t.Fatal(err) }