Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add ewma package for rate calculation.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jun 10, 2021
1 parent 1fc1a8e commit c39ea1d
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 68 deletions.
3 changes: 0 additions & 3 deletions go.sum
Expand Up @@ -695,7 +695,6 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU
github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk=
github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
github.com/jackc/pgconn v1.6.3 h1:4Ks3RKvSvKPolXZsnLQTDAsokDhgID14Cv4ehECmzlY=
github.com/jackc/pgconn v1.6.3/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78=
github.com/jackc/pgconn v1.8.1 h1:ySBX7Q87vOMqKU2bbmKbUvtYhauDFclYbNDYIE1/h6s=
github.com/jackc/pgconn v1.8.1/go.mod h1:JV6m6b6jhjdmzchES0drzCcYcAHS1OPD5xu3OZ/lE2g=
Expand All @@ -714,7 +713,6 @@ github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod
github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY=
github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.0.6 h1:b1105ZGEMFe7aCvrT1Cca3VoVb4ZFMaFJLJcg/3zD+8=
github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
Expand Down Expand Up @@ -1233,7 +1231,6 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
Expand Down
61 changes: 61 additions & 0 deletions pkg/ewma/ewma.go
@@ -0,0 +1,61 @@
// This code has been borrowed from https://github.com/prometheus/prometheus/blob/main/storage/remote/ewma.go
//
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package ewma

import (
"sync"
"sync/atomic"
"time"
)

// Rate tracks an exponentially weighted moving average of a per-second rate.
type Rate struct {
newEvents int64

alpha float64
interval time.Duration
lastRate float64
init bool
mutex sync.Mutex
}

// NewEWMARate always allocates a new ewmaRate, as this guarantees the atomically
// accessed int64 will be aligned on ARM. See prometheus#2666.
func NewEWMARate(alpha float64, interval time.Duration) *Rate {
return &Rate{
alpha: alpha,
interval: interval,
}
}

// Rate returns the per-second rate.
func (r *Rate) Rate() float64 {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.lastRate
}

// Tick assumes to be called every r.interval.
func (r *Rate) Tick() {
newEvents := atomic.SwapInt64(&r.newEvents, 0)
instantRate := float64(newEvents) / r.interval.Seconds()

r.mutex.Lock()
defer r.mutex.Unlock()

if r.init {
r.lastRate += r.alpha * (instantRate - r.lastRate)
} else if newEvents > 0 {
r.init = true
r.lastRate = instantRate
}
}

// Incr counts incr events.
func (r *Rate) Incr(incr int64) {
atomic.AddInt64(&r.newEvents, incr)
}
2 changes: 1 addition & 1 deletion pkg/internal/testhelpers/containers.go
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/stretchr/testify/require"
testcontainers "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
)
Expand Down
19 changes: 12 additions & 7 deletions pkg/pgmodel/ingestor/dispatcher.go
Expand Up @@ -196,9 +196,11 @@ func (p *pgxDispatcher) Close() {
// Though we may insert data to multiple tables concurrently, if asyncAcks is
// unset this function will wait until _all_ the insert attempts have completed.
func (p *pgxDispatcher) InsertData(dataTS model.Data) (uint64, error) {
rows := dataTS.Rows
var numRows uint64
workFinished := &sync.WaitGroup{}
var (
numRows uint64
rows = dataTS.Rows
workFinished = &sync.WaitGroup{}
)
workFinished.Add(len(rows))
// we only allocate enough space for a single error message here as we only
// report one error back upstream. The inserter should not block on this
Expand All @@ -211,12 +213,16 @@ func (p *pgxDispatcher) InsertData(dataTS model.Data) (uint64, error) {
// the following is usually non-blocking, just a channel insert
p.getMetricBatcher(metricName) <- &insertDataRequest{metric: metricName, data: data, finished: workFinished, errChan: errChan}
}
reportIncomingBatch(numRows)
reportOutgoing := func() {
reportOutgoingBatch(numRows)
reportBatchProcessingTime(dataTS.ReceivedTime)
}

var err error
if !p.asyncAcks {
workFinished.Wait()
decBatch(numRows)
registerIngestionDuration(dataTS.InTime)
reportOutgoing()
select {
case err = <-errChan:
default:
Expand All @@ -225,8 +231,7 @@ func (p *pgxDispatcher) InsertData(dataTS model.Data) (uint64, error) {
} else {
go func() {
workFinished.Wait()
decBatch(numRows)
registerIngestionDuration(dataTS.InTime)
reportOutgoing()
select {
case err = <-errChan:
default:
Expand Down
3 changes: 1 addition & 2 deletions pkg/pgmodel/ingestor/ingestor.go
Expand Up @@ -94,8 +94,7 @@ func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, error) {
// samples) must no longer be reachable from req.
FinishWriteRequest(r)

incBatch(uint64(totalRows))
rowsInserted, err := ingestor.dispatcher.InsertData(model.Data{Rows: dataSamples, InTime: time.Now()})
rowsInserted, err := ingestor.dispatcher.InsertData(model.Data{Rows: dataSamples, ReceivedTime: time.Now()})
if err == nil && rowsInserted != totalRows {
return rowsInserted, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRows, rowsInserted)
}
Expand Down
100 changes: 49 additions & 51 deletions pkg/pgmodel/ingestor/watcher.go
Expand Up @@ -6,35 +6,32 @@ package ingestor

import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/timescale/promscale/pkg/ewma"
"github.com/timescale/promscale/pkg/log"
)

const (
// reportThreshold is a value above which the warn would be logged. This value is
// the value of the ratio between incoming batches vs outgoing batches.
reportRatioThreshold = 3
refreshIteration = 10 // This means that values of watcher.incomingBatches and watcher.outgoingBatches gets renewed every (refreshIteration * checkThroughputInterval).
checkRatioInterval = time.Minute

// Duration warnings constants.
reportDurationThreshold = time.Minute
checkIngestionDurationInterval = time.Minute
reportDurationThreshold = time.Minute
)

// throughtputWatcher is a light weight samples batch watching type, that serves as a watching
// routine and keeps the track of incoming write batches. It keeps a ratio of
// incoming samples vs outgoing samples and warn about low throughput to the user
// which might be due to external causes like network latency, resources allocated, etc.
//
// We watch the batches than the samples in total, since watching batching is fast
// and does the same purpose of watching samples, but on a higher-level.
type throughtputWatcher struct {
// Warn based on ratio.
incomingBatches uint64
outgoingBatches uint64
incomingBatches *ewma.Rate
outgoingBatches *ewma.Rate

// Warn based on ingestion duration.
shouldReportLastDuration atomic.Value
Expand All @@ -43,76 +40,77 @@ type throughtputWatcher struct {
stop chan struct{}
}

var watcher *throughtputWatcher
var (
watcher *throughtputWatcher
tWatcher = new(sync.Once)
)

func runThroughputWatcher(stop chan struct{}) {
tWatcher.Do(func() {
watcher := newThroughputWatcher(stop)
go watcher.watch()
})
}

func newThroughputWatcher(stop chan struct{}) *throughtputWatcher {
watcher = &throughtputWatcher{
incomingBatches: 0,
outgoingBatches: 0,
incomingBatches: ewma.NewEWMARate(1, checkRatioInterval),
outgoingBatches: ewma.NewEWMARate(1, checkRatioInterval),
stop: stop,
}
watcher.shouldReportLastDuration.Store(false)
go watcher.watch()
go watcher.watchTime()
return watcher
}

// watch watches the ratio between incomingBatches and outgoingBatches every checkThroughputInterval.
// watchBatches watches the ratio between incomingBatches and outgoingBatches every checkRatioInterval.
func (w *throughtputWatcher) watch() {
var (
itr uint8
t = time.NewTicker(checkRatioInterval)
)
for range t.C {
t := time.NewTicker(checkRatioInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-w.stop:
return
default:
}
itr++
if w.outgoingBatches > 1 {
if ratio := float64(atomic.LoadUint64(&w.incomingBatches)) / float64(atomic.LoadUint64(&w.outgoingBatches)); ratio > reportRatioThreshold {
w.warn(ratio)
w.outgoingBatches.Tick()
w.incomingBatches.Tick()
outgoingRate := w.outgoingBatches.Rate()
incomingRate := w.incomingBatches.Rate()
if outgoingRate > 0 {
r := incomingRate / outgoingRate
if r > reportRatioThreshold {
warnHighRatio(r, incomingRate, outgoingRate)
}
}
if itr > refreshIteration {
// refresh every 'refreshIteration' of outgoing ratio checks.
atomic.StoreUint64(&watcher.incomingBatches, 0)
atomic.StoreUint64(&watcher.outgoingBatches, 0)
if w.shouldReportLastDuration.Load().(bool) {
warnSlowIngestion(w.lastIngestionDuration.Load().(time.Duration))
w.shouldReportLastDuration.Store(false)
}
}
}

func (w *throughtputWatcher) warn(ratio float64) {
log.Warn("msg", "[WARNING] Incoming samples rate higher than outgoing samples. This may happen due to poor network latency, "+
"less resources allocated, or few concurrent writes (shards) from Prometheus. Please tune your resource to improve performance. ",
"Throughput ratio", fmt.Sprintf("%.2f", ratio), "Ratio threshold", reportRatioThreshold)
func warnHighRatio(ratio float64, inRate, outRate float64) {
log.Warn("msg", "[WARNING] Incoming samples rate much higher than the rate of samples being saved. "+
"This may happen due to poor network latency, not enough resources allocated to Promscale, or some other performance-related reason. "+
"Please tune your system or reach out to the Promscale team.",
"incoming-rate", inRate, "outgoing-rate", outRate,
"throughput-ratio", fmt.Sprintf("%.2f", ratio), "threshold", reportRatioThreshold)
}

func (w *throughtputWatcher) watchTime() {
t := time.NewTicker(checkIngestionDurationInterval)
for range t.C {
select {
case <-w.stop:
return
default:
}
if w.shouldReportLastDuration.Load().(bool) {
log.Warn("msg", "[WARNING] Ingestion is taking too long", "ingest duration",
w.lastIngestionDuration.Load().(time.Duration).String(), "threshold", reportDurationThreshold.String())
w.shouldReportLastDuration.Store(false)
}
}
func warnSlowIngestion(duration time.Duration) {
log.Warn("msg", "[WARNING] Ingestion is a very long time", "duration",
duration.String(), "threshold", reportDurationThreshold.String())
}

func incBatch(size uint64) {
atomic.AddUint64(&watcher.incomingBatches, size)
func reportIncomingBatch(size uint64) {
watcher.incomingBatches.Incr(int64(size))
}

func decBatch(size uint64) {
atomic.AddUint64(&watcher.outgoingBatches, size)
func reportOutgoingBatch(size uint64) {
watcher.outgoingBatches.Incr(int64(size))
}

func registerIngestionDuration(inTime time.Time) {
func reportBatchProcessingTime(inTime time.Time) {
d := time.Since(inTime)
if d.Seconds() > reportDurationThreshold.Seconds() {
watcher.shouldReportLastDuration.Store(true)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/model/samples.go
Expand Up @@ -21,8 +21,8 @@ type Samples interface {
// Data wraps incoming data with its in-timestamp. It is used to warn if the rate
// of incoming samples vs outgoing samples is too low, based on time.
type Data struct {
Rows map[string][]Samples
InTime time.Time
Rows map[string][]Samples
ReceivedTime time.Time
}

type promSample struct {
Expand Down
8 changes: 8 additions & 0 deletions pkg/pgmodel/querier/series_set_test.go
Expand Up @@ -30,26 +30,31 @@ type mockPgxRows struct {
err error
}

//nolint
// Close closes the rows, making the connection ready for use again. It is safe
// to call Close after rows is already closed.
func (m *mockPgxRows) Close() {
m.closeCalled = true
}

//nolint
// Err returns any error that occurred while reading.
func (m *mockPgxRows) Err() error {
return nil
}

//nolint
// CommandTag returns the command tag from this query. It is only available after Rows is closed.
func (m *mockPgxRows) CommandTag() pgconn.CommandTag {
panic("not implemented")
}

//nolint
func (m *mockPgxRows) FieldDescriptions() []pgproto3.FieldDescription {
panic("not implemented")
}

//nolint
// Next prepares the next row for reading. It returns true if there is another
// row and false if no more rows are available. It automatically closes rows
// when all rows are read.
Expand All @@ -62,6 +67,7 @@ func (m *mockPgxRows) Next() bool {
return m.idx < len(m.results)
}

//nolint
// Scan reads the values from the current row into dest values positionally.
// dest can include pointers to core types, values implementing the Scanner
// interface, []byte, and nil. []byte will skip the decoding process and directly
Expand Down Expand Up @@ -95,11 +101,13 @@ func (m *mockPgxRows) Scan(dest ...interface{}) error {
return nil
}

//nolint
// Values returns the decoded row values.
func (m *mockPgxRows) Values() ([]interface{}, error) {
panic("not implemented")
}

//nolint
// RawValues returns the unparsed bytes of the row values. The returned [][]byte is only valid until the next Next
// call or the Rows is closed. However, the underlying byte data is safe to retain a reference to and mutate.
func (m *mockPgxRows) RawValues() [][]byte {
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/upgrade_tests/upgrade_test.go
Expand Up @@ -499,7 +499,7 @@ func TestExtensionUpgrade(t *testing.T) {
var version string
ctx := context.Background()
buildPromscaleImageFromRepo(t)
db, dbContainer, closer := startDB(t, ctx)
_, dbContainer, closer := startDB(t, ctx)
defer closer.Close()

defer testhelpers.StopContainer(ctx, dbContainer, false)
Expand All @@ -509,7 +509,7 @@ func TestExtensionUpgrade(t *testing.T) {
extVersion := "2.0.0-rc2"
dropAndCreateExt(t, ctx, extVersion)

db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit c39ea1d

Please sign in to comment.