Skip to content

Commit

Permalink
chore: configure histogram buckets and add sourceId label on event_de…
Browse files Browse the repository at this point in the history
…livery_time metric (#4559)
  • Loading branch information
satishrudderstack committed Apr 8, 2024
1 parent 6d5be81 commit be5d501
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
10 changes: 6 additions & 4 deletions router/batchrouter/handle_observability.go
Expand Up @@ -188,10 +188,12 @@ func (brt *Handle) recordUploadStats(destination Connection, output UploadResult

if receivedTime, err := time.Parse(misc.RFC3339Milli, output.FirstEventAt); err == nil {
eventDeliveryTimeStat := stats.Default.NewTaggedStat("event_delivery_time", stats.TimerType, map[string]string{
"module": "batch_router",
"destType": brt.destType,
"destination": destinationTag,
"workspaceId": destination.Source.WorkspaceID,
"module": "batch_router",
"destType": brt.destType,
"destination": destinationTag,
"workspaceId": destination.Source.WorkspaceID,
"sourceId": destination.Source.ID,
"destinationId": destination.Destination.ID,
})
eventDeliveryTimeStat.SendTiming(time.Since(receivedTime))
}
Expand Down
1 change: 1 addition & 0 deletions router/worker.go
Expand Up @@ -1083,6 +1083,7 @@ func (w *worker) sendEventDeliveryStat(destinationJobMetadata *types.JobMetadata
"destID": destination.ID,
"destination": destinationTag,
"workspaceId": status.WorkspaceId,
"sourceId": destinationJobMetadata.SourceID,
})

eventsDeliveryTimeStat.SendTiming(time.Since(receivedTime))
Expand Down
7 changes: 7 additions & 0 deletions runner/buckets.go
Expand Up @@ -21,3 +21,10 @@ var customBuckets = map[string][]float64{
86400, 432000, 864000, 2592000, 5184000, 7776000, 15552000, 31104000, // 1 day, 5 days, 10 days, 30 days, 60 days, 90 days, 180 days, 360 days
},
}

var customBucketsServer = map[string][]float64{
"event_delivery_time": {
0.5, 1, 2.5, 5, 10, 30, 60, 300 /* 5 minutes */, 600 /* 10 minutes */, 1800, /* 30 minutes */
3600 /* 1 hour */, 10800 /* 3 hours */, 21600 /* 6 hours */, 86400, /* 1 day */
},
}
3 changes: 3 additions & 0 deletions runner/runner.go
Expand Up @@ -113,6 +113,9 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultWarehouseHistogramBuckets))
} else {
statsOptions = append(statsOptions, stats.WithDefaultHistogramBuckets(defaultHistogramBuckets))
for histogramName, buckets := range customBucketsServer {
statsOptions = append(statsOptions, stats.WithHistogramBuckets(histogramName, buckets))
}
}
for histogramName, buckets := range customBuckets {
statsOptions = append(statsOptions, stats.WithHistogramBuckets(histogramName, buckets))
Expand Down

0 comments on commit be5d501

Please sign in to comment.