From 87d6551a22ad29f5130a2df619b36ab65d7f4c9c Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Wed, 28 Jun 2023 18:20:30 +0200 Subject: [PATCH] chore: splitting warehouse buckets --- .../kafka_batching/kafka_batching_test.go | 8 ++----- runner/runner.go | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/integration_test/kafka_batching/kafka_batching_test.go b/integration_test/kafka_batching/kafka_batching_test.go index 1a3be3d768..60cc299a9d 100644 --- a/integration_test/kafka_batching/kafka_batching_test.go +++ b/integration_test/kafka_batching/kafka_batching_test.go @@ -307,6 +307,7 @@ func TestKafkaBatching(t *testing.T) { requireHistogramEqual(t, metrics["router_kafka_batch_size"], histogram{ name: "router_kafka_batch_size", count: 1, sum: 10, buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.002)}, {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.005)}, {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.01)}, {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.025)}, @@ -318,16 +319,11 @@ func TestKafkaBatching(t *testing.T) { {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(2.5)}, {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(5.0)}, {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(10.0)}, // 10 is the number of messages we sent + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, // 10 is the number of messages we sent {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(60.0)}, {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(300.0)}, {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(600.0)}, {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(1800.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(10800.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(36000.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(86400.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(259200.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(604800.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(1209600.0)}, {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, }, labels: expectedDefaultAttrs, diff --git a/runner/runner.go b/runner/runner.go index 59f3a570be..d8fae6ffe7 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -55,11 +55,17 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/validations" ) -var defaultHistogramBuckets = []float64{ - 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60, - 300 /* 5 mins */, 600 /* 10 mins */, 1800 /* 30 mins */, 10800 /* 3 hours */, 36000, /* 10 hours */ - 86400 /* 1 day */, 259200 /* 3 days */, 604800 /* 7 days */, 1209600, /* 2 weeks */ -} +var ( + defaultHistogramBuckets = []float64{ + 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, + 300 /* 5 mins */, 600 /* 10 mins */, 1800, /* 30 mins */ + } + defaultWarehouseHistogramBuckets = []float64{ + 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60, + 300 /* 5 mins */, 600 /* 10 mins */, 1800 /* 30 mins */, 10800 /* 3 hours */, 36000, /* 10 hours */ + 86400 /* 1 day */, 259200 /* 3 days */, 604800 /* 7 days */, 1209600, /* 2 weeks */ + } +) // ReleaseInfo holds the release information type ReleaseInfo struct { @@ -133,7 +139,11 @@ func (r *Runner) Run(ctx context.Context, args []string) int { statsOptions := []stats.Option{ stats.WithServiceName(r.appType), stats.WithServiceVersion(r.releaseInfo.Version), - stats.WithDefaultHistogramBuckets(defaultHistogramBuckets), + } + if r.canStartWarehouse() { + statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultWarehouseHistogramBuckets)) + } else { + statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultHistogramBuckets)) } for histogramName, buckets := range customBuckets { statsOptions = append(statsOptions, stats.WithHistogramBuckets(histogramName, buckets))