Skip to content

Commit

Permalink
chore: gateway_response_time buckets (#3554)
Browse files Browse the repository at this point in the history
* chore: gateway_response_time buckets

* chore: removing 300 bucket

* chore: splitting warehouse buckets

* Update integration_test/kafka_batching/kafka_batching_test.go

Co-authored-by: Aris Tzoumas <atzoumas@rudderstack.com>

---------

Co-authored-by: Aris Tzoumas <atzoumas@rudderstack.com>
  • Loading branch information
fracasula and atzoum committed Jun 30, 2023
1 parent 702ce95 commit bed100d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
8 changes: 2 additions & 6 deletions integration_test/kafka_batching/kafka_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func TestKafkaBatching(t *testing.T) {
requireHistogramEqual(t, metrics["router_kafka_batch_size"], histogram{
name: "router_kafka_batch_size", count: 1, sum: 10,
buckets: []*promClient.Bucket{
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.002)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.005)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.01)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(0.025)},
Expand All @@ -318,16 +319,11 @@ func TestKafkaBatching(t *testing.T) {
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(2.5)},
{CumulativeCount: ptr(uint64(0)), UpperBound: ptr(5.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(10.0)}, // 10 is the number of messages we sent
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(60.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(300.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(600.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(1800.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(10800.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(36000.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(86400.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(259200.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(604800.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(1209600.0)},
{CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))},
},
labels: expectedDefaultAttrs,
Expand Down
7 changes: 7 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package runner

var customBuckets = map[string][]float64{
"gateway.response_time": {
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60,
},
}
30 changes: 22 additions & 8 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

var defaultHistogramBuckets = []float64{
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60,
300 /* 5 mins */, 600 /* 10 mins */, 1800 /* 30 mins */, 10800 /* 3 hours */, 36000, /* 10 hours */
86400 /* 1 day */, 259200 /* 3 days */, 604800 /* 7 days */, 1209600, /* 2 weeks */
}
var (
defaultHistogramBuckets = []float64{
0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60,
300 /* 5 mins */, 600 /* 10 mins */, 1800, /* 30 mins */
}
defaultWarehouseHistogramBuckets = []float64{
0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60,
300 /* 5 mins */, 600 /* 10 mins */, 1800 /* 30 mins */, 10800 /* 3 hours */, 36000, /* 10 hours */
86400 /* 1 day */, 259200 /* 3 days */, 604800 /* 7 days */, 1209600, /* 2 weeks */
}
)

// ReleaseInfo holds the release information
type ReleaseInfo struct {
Expand Down Expand Up @@ -130,11 +136,19 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
(!config.IsSet("WORKSPACE_NAMESPACE") || strings.Contains(config.GetString("WORKSPACE_NAMESPACE", ""), "free")) {
config.Set("statsExcludedTags", []string{"workspaceId", "sourceID", "destId"})
}
stats.Default = stats.NewStats(config.Default, logger.Default, svcMetric.Instance,
statsOptions := []stats.Option{
stats.WithServiceName(r.appType),
stats.WithServiceVersion(r.releaseInfo.Version),
stats.WithDefaultHistogramBuckets(defaultHistogramBuckets),
)
}
if r.canStartWarehouse() {
statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultWarehouseHistogramBuckets))
} else {
statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultHistogramBuckets))
}
for histogramName, buckets := range customBuckets {
statsOptions = append(statsOptions, stats.WithHistogramBuckets(histogramName, buckets))
}
stats.Default = stats.NewStats(config.Default, logger.Default, svcMetric.Instance, statsOptions...)
if err := stats.Default.Start(ctx, rruntime.GoRoutineFactory); err != nil {
r.logger.Errorf("Failed to start stats: %v", err)
return 1
Expand Down

0 comments on commit bed100d

Please sign in to comment.