From 52a828e3d316e53b115decf495e1f51beaed46c0 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 5 Mar 2024 01:35:28 +0530 Subject: [PATCH 1/3] chore: some non-exhaustive processor drop count stats --- processor/processor.go | 70 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 20b37f043a..44d61e041b 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1456,7 +1456,11 @@ func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, s } } -func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadata, inCountMap, successCountMap, failedCountMap, filteredCountMap map[string]int64) []*types.PUReportedMetric { +func getDiffMetrics( + inPU, pu string, + inCountMetadataMap map[string]MetricMetadata, + inCountMap, successCountMap, failedCountMap, filteredCountMap map[string]int64, +) []*types.PUReportedMetric { // Calculate diff and append to reportMetrics // diff = successCount + abortCount - inCount diffMetrics := make([]*types.PUReportedMetric, 0) @@ -1849,6 +1853,17 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf map[string]int64{}, ) reportMetrics = append(reportMetrics, diffMetrics...) + for _, diff := range diffMetrics { + proc.statsFactory.NewTaggedStat( + "processor_drop_count", + stats.CountType, + stats.Tags{ + "stage": types.DESTINATION_FILTER, + "sourceId": diff.ConnectionDetails.SourceID, + "destinationId": diff.ConnectionDetails.DestinationID, + }, + ).Count(int(diff.StatusDetail.Count)) + } } // REPORTING - GATEWAY metrics - END @@ -2508,6 +2523,17 @@ func (proc *Handle) transformSrcDest( nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap, ) + for _, diff := range diffMetrics { + proc.statsFactory.NewTaggedStat( + "processor_drop_count", + stats.CountType, + stats.Tags{ + "stage": types.USER_TRANSFORMER, + "sourceId": diff.ConnectionDetails.SourceID, + "destinationId": diff.ConnectionDetails.DestinationID, + }, + ).Count(int(diff.StatusDetail.Count)) + } reportMetrics = append(reportMetrics, successMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) @@ -2583,7 +2609,26 @@ func (proc *Handle) transformSrcDest( // REPORTING - START if proc.isReportingEnabled() { - diffMetrics := getDiffMetrics(inPU, types.EVENT_FILTER, inCountMetadataMap, inCountMap, successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap) + diffMetrics := getDiffMetrics( + inPU, + types.EVENT_FILTER, + inCountMetadataMap, + inCountMap, + successCountMap, + nonSuccessMetrics.failedCountMap, + nonSuccessMetrics.filteredCountMap, + ) + for _, diff := range diffMetrics { + proc.statsFactory.NewTaggedStat( + "processor_drop_count", + stats.CountType, + stats.Tags{ + "stage": types.EVENT_FILTER, + "sourceId": diff.ConnectionDetails.SourceID, + "destinationId": diff.ConnectionDetails.DestinationID, + }, + ).Count(int(diff.StatusDetail.Count)) + } reportMetrics = append(reportMetrics, successMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) @@ -2671,7 +2716,26 @@ func (proc *Handle) transformSrcDest( } } - diffMetrics := getDiffMetrics(types.EVENT_FILTER, types.DEST_TRANSFORMER, inCountMetadataMap, inCountMap, successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap) + diffMetrics := getDiffMetrics( + types.EVENT_FILTER, + types.DEST_TRANSFORMER, + inCountMetadataMap, + inCountMap, + successCountMap, + nonSuccessMetrics.failedCountMap, + nonSuccessMetrics.filteredCountMap, + ) + for _, diff := range diffMetrics { + proc.statsFactory.NewTaggedStat( + "processor_drop_count", + stats.CountType, + stats.Tags{ + "stage": types.DEST_TRANSFORMER, + "sourceId": diff.ConnectionDetails.SourceID, + "destinationId": diff.ConnectionDetails.DestinationID, + }, + ).Count(int(diff.StatusDetail.Count)) + } reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) From 80140b8b266330ce350868e1d32488507f1f3155 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 5 Mar 2024 09:26:44 +0530 Subject: [PATCH 2/3] fixup! chore: some non-exhaustive processor drop count stats --- processor/processor.go | 58 +++++++++---------------------------- processor/processor_test.go | 23 +++++++++++++-- 2 files changed, 35 insertions(+), 46 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 44d61e041b..9d1b53e407 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1460,6 +1460,7 @@ func getDiffMetrics( inPU, pu string, inCountMetadataMap map[string]MetricMetadata, inCountMap, successCountMap, failedCountMap, filteredCountMap map[string]int64, + statFactory stats.Stats, ) []*types.PUReportedMetric { // Calculate diff and append to reportMetrics // diff = successCount + abortCount - inCount @@ -1486,6 +1487,15 @@ func getDiffMetrics( StatusDetail: types.CreateStatusDetail(types.DiffStatus, diff, 0, 0, "", []byte(`{}`), eventName, eventType, ""), } diffMetrics = append(diffMetrics, metric) + statFactory.NewTaggedStat( + "processor_diff_count", + stats.CountType, + stats.Tags{ + "stage": metric.PU, + "sourceId": metric.ConnectionDetails.SourceID, + "destinationId": metric.ConnectionDetails.DestinationID, + }, + ).Count(int(-diff)) } } @@ -1851,19 +1861,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf outCountMap, map[string]int64{}, map[string]int64{}, + proc.statsFactory, ) reportMetrics = append(reportMetrics, diffMetrics...) - for _, diff := range diffMetrics { - proc.statsFactory.NewTaggedStat( - "processor_drop_count", - stats.CountType, - stats.Tags{ - "stage": types.DESTINATION_FILTER, - "sourceId": diff.ConnectionDetails.SourceID, - "destinationId": diff.ConnectionDetails.DestinationID, - }, - ).Count(int(diff.StatusDetail.Count)) - } } // REPORTING - GATEWAY metrics - END @@ -2522,18 +2522,8 @@ func (proc *Handle) transformSrcDest( successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap, + proc.statsFactory, ) - for _, diff := range diffMetrics { - proc.statsFactory.NewTaggedStat( - "processor_drop_count", - stats.CountType, - stats.Tags{ - "stage": types.USER_TRANSFORMER, - "sourceId": diff.ConnectionDetails.SourceID, - "destinationId": diff.ConnectionDetails.DestinationID, - }, - ).Count(int(diff.StatusDetail.Count)) - } reportMetrics = append(reportMetrics, successMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) @@ -2617,18 +2607,8 @@ func (proc *Handle) transformSrcDest( successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap, + proc.statsFactory, ) - for _, diff := range diffMetrics { - proc.statsFactory.NewTaggedStat( - "processor_drop_count", - stats.CountType, - stats.Tags{ - "stage": types.EVENT_FILTER, - "sourceId": diff.ConnectionDetails.SourceID, - "destinationId": diff.ConnectionDetails.DestinationID, - }, - ).Count(int(diff.StatusDetail.Count)) - } reportMetrics = append(reportMetrics, successMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) @@ -2724,18 +2704,8 @@ func (proc *Handle) transformSrcDest( successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap, + proc.statsFactory, ) - for _, diff := range diffMetrics { - proc.statsFactory.NewTaggedStat( - "processor_drop_count", - stats.CountType, - stats.Tags{ - "stage": types.DEST_TRANSFORMER, - "sourceId": diff.ConnectionDetails.SourceID, - "destinationId": diff.ConnectionDetails.DestinationID, - }, - ).Count(int(diff.StatusDetail.Count)) - } reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) diff --git a/processor/processor_test.go b/processor/processor_test.go index 00e0457356..e3fba21dec 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -25,6 +25,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/admin" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/internal/enricher" @@ -3271,7 +3272,16 @@ var _ = Describe("Static Function Tests", func() { Context("getDiffMetrics Tests", func() { It("Should match diffMetrics response for Empty Inputs", func() { - response := getDiffMetrics("some-string-1", "some-string-2", map[string]MetricMetadata{}, map[string]int64{}, map[string]int64{}, map[string]int64{}, map[string]int64{}) + response := getDiffMetrics( + "some-string-1", + "some-string-2", + map[string]MetricMetadata{}, + map[string]int64{}, + map[string]int64{}, + map[string]int64{}, + map[string]int64{}, + stats.NOP, + ) Expect(len(response)).To(Equal(0)) }) @@ -3357,7 +3367,16 @@ var _ = Describe("Static Function Tests", func() { }, } - response := getDiffMetrics("some-string-1", "some-string-2", inCountMetadataMap, inCountMap, successCountMap, failedCountMap, filteredCountMap) + response := getDiffMetrics( + "some-string-1", + "some-string-2", + inCountMetadataMap, + inCountMap, + successCountMap, + failedCountMap, + filteredCountMap, + stats.NOP, + ) assertReportMetric(expectedResponse, response) }) }) From e803bbced80d3377a0120af4cf8c130267ec510a Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 5 Mar 2024 10:58:06 +0530 Subject: [PATCH 3/3] review comments --- processor/processor_test.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/processor/processor_test.go b/processor/processor_test.go index e3fba21dec..a4e3f43873 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -26,6 +26,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" "github.com/rudderlabs/rudder-server/admin" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/internal/enricher" @@ -3367,6 +3368,8 @@ var _ = Describe("Static Function Tests", func() { }, } + statsStore, err := memstats.New() + Expect(err).To(BeNil()) response := getDiffMetrics( "some-string-1", "some-string-2", @@ -3375,8 +3378,18 @@ var _ = Describe("Static Function Tests", func() { successCountMap, failedCountMap, filteredCountMap, - stats.NOP, + statsStore, ) + Expect(statsStore.Get("processor_diff_count", stats.Tags{ + "stage": "some-string-2", + "sourceId": "some-source-id-1", + "destinationId": "some-destination-id-1", + }).LastValue()).To(Equal(float64(-5))) + Expect(statsStore.Get("processor_diff_count", stats.Tags{ + "stage": "some-string-2", + "sourceId": "some-source-id-2", + "destinationId": "some-destination-id-2", + }).LastValue()).To(Equal(float64(-7))) assertReportMetric(expectedResponse, response) }) })