Skip to content

Commit

Permalink
chore: some non-exhaustive processor drop count stats (#4446)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Mar 5, 2024
2 parents 4070a17 + e803bbc commit 61a820d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 5 deletions.
40 changes: 37 additions & 3 deletions processor/processor.go
Expand Up @@ -1456,7 +1456,12 @@ func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, s
}
}

func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadata, inCountMap, successCountMap, failedCountMap, filteredCountMap map[string]int64) []*types.PUReportedMetric {
func getDiffMetrics(
inPU, pu string,
inCountMetadataMap map[string]MetricMetadata,
inCountMap, successCountMap, failedCountMap, filteredCountMap map[string]int64,
statFactory stats.Stats,
) []*types.PUReportedMetric {
// Calculate diff and append to reportMetrics
// diff = successCount + abortCount - inCount
diffMetrics := make([]*types.PUReportedMetric, 0)
Expand All @@ -1482,6 +1487,15 @@ func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadat
StatusDetail: types.CreateStatusDetail(types.DiffStatus, diff, 0, 0, "", []byte(`{}`), eventName, eventType, ""),
}
diffMetrics = append(diffMetrics, metric)
statFactory.NewTaggedStat(
"processor_diff_count",
stats.CountType,
stats.Tags{
"stage": metric.PU,
"sourceId": metric.ConnectionDetails.SourceID,
"destinationId": metric.ConnectionDetails.DestinationID,
},
).Count(int(-diff))
}
}

Expand Down Expand Up @@ -1847,6 +1861,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
outCountMap,
map[string]int64{},
map[string]int64{},
proc.statsFactory,
)
reportMetrics = append(reportMetrics, diffMetrics...)
}
Expand Down Expand Up @@ -2507,6 +2522,7 @@ func (proc *Handle) transformSrcDest(
successCountMap,
nonSuccessMetrics.failedCountMap,
nonSuccessMetrics.filteredCountMap,
proc.statsFactory,
)
reportMetrics = append(reportMetrics, successMetrics...)
reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...)
Expand Down Expand Up @@ -2583,7 +2599,16 @@ func (proc *Handle) transformSrcDest(

// REPORTING - START
if proc.isReportingEnabled() {
diffMetrics := getDiffMetrics(inPU, types.EVENT_FILTER, inCountMetadataMap, inCountMap, successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap)
diffMetrics := getDiffMetrics(
inPU,
types.EVENT_FILTER,
inCountMetadataMap,
inCountMap,
successCountMap,
nonSuccessMetrics.failedCountMap,
nonSuccessMetrics.filteredCountMap,
proc.statsFactory,
)
reportMetrics = append(reportMetrics, successMetrics...)
reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...)
reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...)
Expand Down Expand Up @@ -2671,7 +2696,16 @@ func (proc *Handle) transformSrcDest(
}
}

diffMetrics := getDiffMetrics(types.EVENT_FILTER, types.DEST_TRANSFORMER, inCountMetadataMap, inCountMap, successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap)
diffMetrics := getDiffMetrics(
types.EVENT_FILTER,
types.DEST_TRANSFORMER,
inCountMetadataMap,
inCountMap,
successCountMap,
nonSuccessMetrics.failedCountMap,
nonSuccessMetrics.filteredCountMap,
proc.statsFactory,
)

reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...)
reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...)
Expand Down
36 changes: 34 additions & 2 deletions processor/processor_test.go
Expand Up @@ -25,6 +25,8 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/internal/enricher"
Expand Down Expand Up @@ -3271,7 +3273,16 @@ var _ = Describe("Static Function Tests", func() {

Context("getDiffMetrics Tests", func() {
It("Should match diffMetrics response for Empty Inputs", func() {
response := getDiffMetrics("some-string-1", "some-string-2", map[string]MetricMetadata{}, map[string]int64{}, map[string]int64{}, map[string]int64{}, map[string]int64{})
response := getDiffMetrics(
"some-string-1",
"some-string-2",
map[string]MetricMetadata{},
map[string]int64{},
map[string]int64{},
map[string]int64{},
map[string]int64{},
stats.NOP,
)
Expect(len(response)).To(Equal(0))
})

Expand Down Expand Up @@ -3357,7 +3368,28 @@ var _ = Describe("Static Function Tests", func() {
},
}

response := getDiffMetrics("some-string-1", "some-string-2", inCountMetadataMap, inCountMap, successCountMap, failedCountMap, filteredCountMap)
statsStore, err := memstats.New()
Expect(err).To(BeNil())
response := getDiffMetrics(
"some-string-1",
"some-string-2",
inCountMetadataMap,
inCountMap,
successCountMap,
failedCountMap,
filteredCountMap,
statsStore,
)
Expect(statsStore.Get("processor_diff_count", stats.Tags{
"stage": "some-string-2",
"sourceId": "some-source-id-1",
"destinationId": "some-destination-id-1",
}).LastValue()).To(Equal(float64(-5)))
Expect(statsStore.Get("processor_diff_count", stats.Tags{
"stage": "some-string-2",
"sourceId": "some-source-id-2",
"destinationId": "some-destination-id-2",
}).LastValue()).To(Equal(float64(-7)))
assertReportMetric(expectedResponse, response)
})
})
Expand Down

0 comments on commit 61a820d

Please sign in to comment.