Skip to content

Commit

Permalink
refactor: gateway stats (#2758)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Jan 11, 2023
1 parent 17c078b commit 65774e2
Show file tree
Hide file tree
Showing 8 changed files with 907 additions and 401 deletions.
544 changes: 249 additions & 295 deletions gateway/gateway.go

Large diffs are not rendered by default.

351 changes: 301 additions & 50 deletions gateway/gateway_test.go

Large diffs are not rendered by default.

106 changes: 106 additions & 0 deletions gateway/internal/stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package stats

import (
"github.com/rudderlabs/rudder-server/services/stats"
)

type SourceStat struct {
Source string

WriteKey string
ReqType string
SourceID string
WorkspaceID string

reason string

requests struct {
total int

succeeded int
failed int
dropped int
suppressed int
}
events struct {
total int

succeeded int
failed int
}
}

// RequestSucceeded increments the requests total & succeeded counters by one
func (ss *SourceStat) RequestSucceeded() {
ss.requests.total++
ss.requests.succeeded++
}

// RequestDropped increments the requests total & dropped counters by one
func (ss *SourceStat) RequestDropped() {
ss.requests.total++
ss.requests.dropped++
}

// RequestSuppressed increments the requests total & suppressed counters by one
func (ss *SourceStat) RequestSuppressed() {
ss.requests.total++
ss.requests.suppressed++
}

// RequestFailed increments the requests total & failed counters by one
func (ss *SourceStat) RequestFailed(reason string) {
ss.requests.total++
ss.requests.failed++
ss.reason = reason
}

// RequestEventsSucceeded increments the requests total & succeeded counters by one, and the events total & succeeded counters by num
func (ss *SourceStat) RequestEventsSucceeded(num int) {
ss.events.succeeded += num
ss.events.total += num
ss.requests.total++
ss.requests.succeeded++
}

// RequestEventsSucceeded increments the requests total & failed counters by one, and the events total & failed counters by num
func (ss *SourceStat) RequestEventsFailed(num int, reason string) {
ss.requests.total++
ss.requests.failed++
ss.events.failed += num
ss.events.total += num
ss.reason = reason
}

// Reports captured stats
func (ss *SourceStat) Report(s stats.Stats) {
tags := map[string]string{
"source": ss.Source,
"writeKey": ss.WriteKey,
"reqType": ss.ReqType,
"workspaceId": ss.WorkspaceID,
"sourceID": ss.SourceID,
}

failedTags := map[string]string{
"source": ss.Source,
"writeKey": ss.WriteKey,
"reqType": ss.ReqType,
"workspaceId": ss.WorkspaceID,
"sourceID": ss.SourceID,
}
if ss.reason != "" {
failedTags["reason"] = ss.reason
}

s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total)
s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded)
s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed)
s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped)
s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed)
if ss.events.total > 0 {
s.NewTaggedStat("gateway.write_key_events", stats.CountType, tags).Count(ss.events.total)
s.NewTaggedStat("gateway.write_key_successful_events", stats.CountType, tags).Count(ss.events.succeeded)
s.NewTaggedStat("gateway.write_key_failed_events", stats.CountType, failedTags).Count(ss.events.failed)
}
}
168 changes: 168 additions & 0 deletions gateway/internal/stats/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package stats

import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/rudderlabs/rudder-server/services/stats/memstats"
trand "github.com/rudderlabs/rudder-server/testhelper/rand"
"github.com/stretchr/testify/require"
)

func getSourceStat(statMap map[string]*SourceStat, sourceTag string) {
statMap[sourceTag] = &SourceStat{
Source: trand.String(10),
SourceID: trand.String(10),
WorkspaceID: trand.String(10),
WriteKey: trand.String(10),
ReqType: trand.String(10),
}
}

func TestReport(t *testing.T) {
// populate some SourceStats
statMap := make(map[string]*SourceStat)
for i := 0; i < 10; i++ {
getSourceStat(statMap, fmt.Sprint(i))
}

// populate some request, event counts
// keep track using some counters
counterMap := make(map[string]*counter)
for i := 0; i < 10; i++ {
counterMap[fmt.Sprint(i)] = &counter{}
}
rand.Seed(time.Now().UnixNano())
for i := 0; i < 10; i++ {
sourceTag := fmt.Sprint(i)
randInt := rand.Int() % 10 // skipcq: GSC-G404
for j := 0; j < randInt; j++ {
statMap[sourceTag].RequestSucceeded()
}
counterMap[sourceTag].succeeded += randInt
counterMap[sourceTag].total += randInt
randInt = rand.Int() % 10 // skipcq: GSC-G404
for j := 0; j < randInt; j++ {
statMap[sourceTag].RequestDropped()
}
counterMap[sourceTag].dropped += randInt
counterMap[sourceTag].total += randInt
randInt = rand.Int() % 10 // skipcq: GSC-G404
for j := 0; j < randInt; j++ {
statMap[sourceTag].RequestSuppressed()
}
counterMap[sourceTag].suppressed += randInt
counterMap[sourceTag].total += randInt
randInt = rand.Int() % 10 // skipcq: GSC-G404
for j := 0; j < randInt; j++ {
statMap[sourceTag].RequestFailed("reason")
}
counterMap[sourceTag].failed += randInt
counterMap[sourceTag].total += randInt
randInt = rand.Int() % 10 // skipcq: GSC-G404
for j := 0; j < randInt; j++ {
statMap[sourceTag].RequestEventsSucceeded(10)
}
counterMap[sourceTag].eventsSucceeded += randInt * 10
counterMap[sourceTag].eventsTotal += randInt * 10
counterMap[sourceTag].total += randInt
counterMap[sourceTag].succeeded += randInt
randInt = rand.Int() % 10 // skipcq: GSC-G404
for j := 0; j < randInt; j++ {
statMap[sourceTag].RequestEventsFailed(10, "reason")
}
counterMap[sourceTag].eventsFailed += randInt * 10
counterMap[sourceTag].eventsTotal += randInt * 10
counterMap[sourceTag].total += randInt
counterMap[sourceTag].failed += randInt
}

// report
statsStore := memstats.New()
for _, v := range statMap {
v.Report(statsStore)
}

// check
for i := 0; i < 10; i++ {
sourceTag := fmt.Sprint(i)
tags := map[string]string{
"source": statMap[sourceTag].Source,
"sourceID": statMap[sourceTag].SourceID,
"workspaceId": statMap[sourceTag].WorkspaceID,
"writeKey": statMap[sourceTag].WriteKey,
"reqType": statMap[sourceTag].ReqType,
}
failedTags := map[string]string{
"source": statMap[sourceTag].Source,
"sourceID": statMap[sourceTag].SourceID,
"workspaceId": statMap[sourceTag].WorkspaceID,
"writeKey": statMap[sourceTag].WriteKey,
"reqType": statMap[sourceTag].ReqType,
"reason": "reason",
}
require.Equal(t,
float64(counterMap[sourceTag].total),
statsStore.Get(
"gateway.write_key_requests",
tags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].succeeded),
statsStore.Get(
"gateway.write_key_successful_requests",
tags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].dropped),
statsStore.Get(
"gateway.write_key_dropped_requests",
tags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].suppressed),
statsStore.Get(
"gateway.write_key_suppressed_requests",
tags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].failed),
statsStore.Get(
"gateway.write_key_failed_requests",
failedTags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].eventsTotal),
statsStore.Get(
"gateway.write_key_events",
tags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].eventsSucceeded),
statsStore.Get(
"gateway.write_key_successful_events",
tags,
).LastValue(),
)
require.Equal(t,
float64(counterMap[sourceTag].eventsFailed),
statsStore.Get(
"gateway.write_key_failed_events",
failedTags,
).LastValue(),
)
}
}

type counter struct {
total, succeeded, failed, dropped, suppressed int
eventsTotal, eventsSucceeded, eventsFailed int
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions gateway/webhook/setup.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:generate mockgen --build_flags=--mod=mod -destination=./../../mocks/gateway/webhook/mock_webhook.go -package mock_webhook github.com/rudderlabs/rudder-server/gateway/webhook GatewayI
//go:generate mockgen --build_flags=--mod=mod -destination=./../mocks/mockwebhook.go -package mockwebhook github.com/rudderlabs/rudder-server/gateway/webhook GatewayI

package webhook

Expand All @@ -9,6 +9,7 @@ import (

"github.com/hashicorp/go-retryablehttp"
"github.com/rudderlabs/rudder-server/config"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/misc"
"golang.org/x/sync/errgroup"
Expand All @@ -17,10 +18,10 @@ import (
type GatewayI interface {
IncrementRecvCount(count uint64)
IncrementAckCount(count uint64)
UpdateSourceStats(writeKeyStats map[string]int, bucket string, sourceTagMap map[string]map[string]string)
TrackRequestMetrics(errorMessage string)
ProcessWebRequest(writer *http.ResponseWriter, req *http.Request, reqType string, requestPayload []byte, writeKey string) string
GetWebhookSourceDefName(writeKey string) (name string, ok bool)
NewSourceStat(writeKey, reqType string) *gwstats.SourceStat
}

type WebHookI interface {
Expand All @@ -38,8 +39,8 @@ func newWebhookStats() *webhookStatsT {
return &wStats
}

func Setup(gwHandle GatewayI, opts ...batchTransformerOption) *HandleT {
webhook := &HandleT{gwHandle: gwHandle}
func Setup(gwHandle GatewayI, stat stats.Stats, opts ...batchTransformerOption) *HandleT {
webhook := &HandleT{gwHandle: gwHandle, stats: stat}
webhook.requestQ = make(map[string](chan *webhookT))
webhook.batchRequestQ = make(chan *batchWebhookT)
webhook.netClient = retryablehttp.NewClient()
Expand Down
Loading

0 comments on commit 65774e2

Please sign in to comment.