Skip to content

Commit

Permalink
chore: migrate stats to otel (#2989)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Mar 6, 2023
1 parent 1b6af85 commit a4243de
Show file tree
Hide file tree
Showing 41 changed files with 2,910 additions and 1,112 deletions.
2 changes: 1 addition & 1 deletion app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
ratelimiter "github.com/rudderlabs/rudder-server/rate-limiter"
"github.com/rudderlabs/rudder-server/services/db"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
type AppHandler interface {
// Setup to be called only once before starting the app.
Setup(*app.Options) error
// Start starts the app
// StartRudderCore starts the app
StartRudderCore(context.Context, *app.Options) error
}

Expand Down
12 changes: 6 additions & 6 deletions config/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func GetWorkspaceToken() string {
return GetString("CONFIG_BACKEND_TOKEN", "")
}

// GetNamespaceIdentifier
// GetNamespaceIdentifier returns value stored in KUBE_NAMESPACE env var or "none" if empty
func GetNamespaceIdentifier() string {
k8sNamespace := GetKubeNamespace()
if k8sNamespace != "" {
Expand All @@ -34,12 +34,12 @@ func GetKubeNamespace() string {
func GetInstanceID() string {
instance := GetString("INSTANCE_ID", "")
instanceArr := strings.Split(instance, "-")
len := len(instanceArr)
length := len(instanceArr)
// This handles 2 kinds of server instances
// a. Processor OR Gateway running in non HA mod where the instance name ends with the index
// b. Gateway running in HA mode, where the instance name is of the form *-gw-ha-<index>-<statefulset-id>-<pod-id>
potentialServerIndexIndicees := []int{len - 1, len - 3}
for _, i := range potentialServerIndexIndicees {
// a) Processor OR Gateway running in non HA mod where the instance name ends with the index
// b) Gateway running in HA mode, where the instance name is of the form *-gw-ha-<index>-<statefulset-id>-<pod-id>
potentialServerIndexIndices := []int{length - 1, length - 3}
for _, i := range potentialServerIndexIndices {
if i < 0 {
continue
}
Expand Down
3 changes: 1 addition & 2 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"testing"
"time"

sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -36,6 +34,7 @@ import (
mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb"
mocksRateLimiter "github.com/rudderlabs/rudder-server/mocks/rate-limiter"
mocksTypes "github.com/rudderlabs/rudder-server/mocks/utils/types"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/stats/memstats"
Expand Down
4 changes: 2 additions & 2 deletions gateway/internal/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (ss *SourceStat) RequestEventsSucceeded(num int) {
ss.requests.succeeded++
}

// RequestEventsSucceeded increments the requests total & failed counters by one, and the events total & failed counters by num
// RequestEventsFailed increments the requests total & failed counters by one, and the events total & failed counters by num
func (ss *SourceStat) RequestEventsFailed(num int, reason string) {
ss.requests.total++
ss.requests.failed++
Expand All @@ -74,7 +74,7 @@ func (ss *SourceStat) RequestEventsFailed(num int, reason string) {
ss.reason = reason
}

// Reports captured stats
// Report captured stats
func (ss *SourceStat) Report(s stats.Stats) {
tags := map[string]string{
"source": ss.Source,
Expand Down
27 changes: 14 additions & 13 deletions gateway/internal/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/services/stats/memstats"
trand "github.com/rudderlabs/rudder-server/testhelper/rand"
"github.com/stretchr/testify/require"
)

func getSourceStat(statMap map[string]*SourceStat, sourceTag string) {
statMap[sourceTag] = &SourceStat{
Source: sourceTag,
SourceID: trand.String(10),
WorkspaceID: trand.String(10),
WriteKey: trand.String(10),
ReqType: trand.String(10),
SourceType: trand.String(10),
Version: trand.String(10),
}
}

func TestReport(t *testing.T) {
// populate some SourceStats
statMap := make(map[string]*SourceStat)
Expand Down Expand Up @@ -177,6 +166,18 @@ func TestReport(t *testing.T) {
}
}

func getSourceStat(statMap map[string]*SourceStat, sourceTag string) {
statMap[sourceTag] = &SourceStat{
Source: sourceTag,
SourceID: trand.String(10),
WorkspaceID: trand.String(10),
WriteKey: trand.String(10),
ReqType: trand.String(10),
SourceType: trand.String(10),
Version: trand.String(10),
}
}

type counter struct {
total, succeeded, failed, dropped, suppressed int
eventsTotal, eventsSucceeded, eventsFailed int
Expand Down
5 changes: 3 additions & 2 deletions gateway/webhook/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"time"

"github.com/hashicorp/go-retryablehttp"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-server/config"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/misc"
"golang.org/x/sync/errgroup"
)

type GatewayI interface {
Expand Down Expand Up @@ -41,7 +42,7 @@ func newWebhookStats() *webhookStatsT {

func Setup(gwHandle GatewayI, stat stats.Stats, opts ...batchTransformerOption) *HandleT {
webhook := &HandleT{gwHandle: gwHandle, stats: stat}
webhook.requestQ = make(map[string](chan *webhookT))
webhook.requestQ = make(map[string]chan *webhookT)
webhook.batchRequestQ = make(chan *batchWebhookT)
webhook.netClient = retryablehttp.NewClient()
webhook.netClient.HTTPClient.Timeout = config.GetDuration("HttpClient.webhook.timeout", 30, time.Second)
Expand Down
33 changes: 16 additions & 17 deletions gateway/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/rudderlabs/rudder-server/config"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
mock_webhook "github.com/rudderlabs/rudder-server/gateway/mocks"
gwStats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
mockWebhook "github.com/rudderlabs/rudder-server/gateway/mocks"
"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/stretchr/testify/assert"
)

const (
Expand All @@ -29,7 +30,6 @@ const (
)

var (
whStats *webhookStatsT
once sync.Once
outputToGateway = map[string]interface{}{"hello": "world"}
outputToWebhook = &outputToSource{
Expand All @@ -45,14 +45,13 @@ func initWebhook() {
misc.Init()
Init()
maxTransformerProcess = 1
whStats = newWebhookStats()
})
}

func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mock_webhook.NewMockGatewayI(ctrl)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, sampleError, http.StatusBadRequest)
Expand All @@ -65,7 +64,7 @@ func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwstats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand All @@ -80,7 +79,7 @@ func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) {
func TestWebhookRequestHandlerWithTransformerBatchPayloadLengthMismatchError(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mock_webhook.NewMockGatewayI(ctrl)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() { _ = r.Body.Close() }()
Expand All @@ -106,7 +105,7 @@ func TestWebhookRequestHandlerWithTransformerBatchPayloadLengthMismatchError(t *
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwstats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand All @@ -121,7 +120,7 @@ func TestWebhookRequestHandlerWithTransformerBatchPayloadLengthMismatchError(t *
func TestWebhookRequestHandlerWithTransformerRequestError(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mock_webhook.NewMockGatewayI(ctrl)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() { _ = r.Body.Close() }()
Expand All @@ -146,7 +145,7 @@ func TestWebhookRequestHandlerWithTransformerRequestError(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwstats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand All @@ -161,7 +160,7 @@ func TestWebhookRequestHandlerWithTransformerRequestError(t *testing.T) {
func TestWebhookRequestHandlerWithOutputToSource(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mock_webhook.NewMockGatewayI(ctrl)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() { _ = r.Body.Close() }()
Expand All @@ -185,7 +184,7 @@ func TestWebhookRequestHandlerWithOutputToSource(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics("").Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwstats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand All @@ -200,7 +199,7 @@ func TestWebhookRequestHandlerWithOutputToSource(t *testing.T) {
func TestWebhookRequestHandlerWithOutputToGateway(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mock_webhook.NewMockGatewayI(ctrl)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
outputToGateway := map[string]interface{}{"text": "hello world"}
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -225,7 +224,7 @@ func TestWebhookRequestHandlerWithOutputToGateway(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics("").Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwstats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)

gwPayload, _ := json.Marshal(outputToGateway)
mockGW.EXPECT().ProcessWebRequest(gomock.Any(), gomock.Any(), "batch", gwPayload, sampleWriteKey).Times(1)
Expand All @@ -243,7 +242,7 @@ func TestWebhookRequestHandlerWithOutputToGateway(t *testing.T) {
func TestWebhookRequestHandlerWithOutputToGatewayAndSource(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mock_webhook.NewMockGatewayI(ctrl)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() { _ = r.Body.Close() }()
Expand All @@ -268,7 +267,7 @@ func TestWebhookRequestHandlerWithOutputToGatewayAndSource(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics("").Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwstats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)

gwPayload, _ := json.Marshal(outputToGateway)
mockGW.EXPECT().ProcessWebRequest(gomock.Any(), gomock.Any(), "batch", gwPayload, sampleWriteKey).Times(1)
Expand Down
16 changes: 14 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/foxcpp/go-mockdns v1.0.1-0.20220408113050-3599dc5d2c7d
github.com/fsnotify/fsnotify v1.6.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
Expand Down Expand Up @@ -76,6 +75,12 @@ require (
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
go.etcd.io/etcd/api/v3 v3.5.7
go.etcd.io/etcd/client/v3 v3.5.7
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2
go.opentelemetry.io/otel/metric v0.34.0
go.opentelemetry.io/otel/sdk v1.11.2
go.opentelemetry.io/otel/sdk/metric v0.34.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/goleak v1.2.0
go.uber.org/zap v1.24.0
Expand Down Expand Up @@ -142,6 +147,8 @@ require (
github.com/gabriel-vasile/mimetype v1.4.1 // indirect
github.com/garyburd/redigo v1.6.4 // indirect
github.com/go-ini/ini v1.63.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
Expand All @@ -157,6 +164,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand All @@ -175,7 +183,6 @@ require (
github.com/mattn/go-ieproxy v0.0.9 // indirect
github.com/mattn/go-runewidth v0.0.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.25 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
Expand Down Expand Up @@ -227,6 +234,11 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
Expand Down

0 comments on commit a4243de

Please sign in to comment.