Skip to content

Commit

Permalink
chore: splitting warehouse buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Jun 28, 2023
1 parent b70c5f1 commit 87d6551
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
8 changes: 2 additions & 6 deletions integration_test/kafka_batching/kafka_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func TestKafkaBatching(t *testing.T) {
requireHistogramEqual(t, metrics["router_kafka_batch_size"], histogram{
name: "router_kafka_batch_size", count: 1, sum: 10,
buckets: []*promClient.Bucket{
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.002)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.005)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.01)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.025)},
Expand All @@ -318,16 +319,11 @@ func TestKafkaBatching(t *testing.T) {
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(2.5)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(5.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(10.0)}, // 10 is the number of messages we sent
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, // 10 is the number of messages we sent
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(60.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(300.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(600.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(1800.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(10800.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(36000.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(86400.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(259200.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(604800.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(1209600.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))},
},
labels: expectedDefaultAttrs,
Expand Down
22 changes: 16 additions & 6 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

var defaultHistogramBuckets = []float64{
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60,
300 /* 5 mins */, 600 /* 10 mins */, 1800 /* 30 mins */, 10800 /* 3 hours */, 36000, /* 10 hours */
86400 /* 1 day */, 259200 /* 3 days */, 604800 /* 7 days */, 1209600, /* 2 weeks */
}
var (
defaultHistogramBuckets = []float64{
0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60,
300 /* 5 mins */, 600 /* 10 mins */, 1800, /* 30 mins */
}
defaultWarehouseHistogramBuckets = []float64{
0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60,
300 /* 5 mins */, 600 /* 10 mins */, 1800 /* 30 mins */, 10800 /* 3 hours */, 36000, /* 10 hours */
86400 /* 1 day */, 259200 /* 3 days */, 604800 /* 7 days */, 1209600, /* 2 weeks */
}
)

// ReleaseInfo holds the release information
type ReleaseInfo struct {
Expand Down Expand Up @@ -133,7 +139,11 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
statsOptions := []stats.Option{
stats.WithServiceName(r.appType),
stats.WithServiceVersion(r.releaseInfo.Version),
stats.WithDefaultHistogramBuckets(defaultHistogramBuckets),
}
if r.canStartWarehouse() {
statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultWarehouseHistogramBuckets))
} else {
statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultHistogramBuckets))
}
for histogramName, buckets := range customBuckets {
statsOptions = append(statsOptions, stats.WithHistogramBuckets(histogramName, buckets))
Expand Down

0 comments on commit 87d6551

Please sign in to comment.