Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Nov 21, 2023
1 parent 815fb77 commit 8e73bcb
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,30 @@ func (proc *Handle) getNonSuccessfulMetrics(
return m
}

func procFilteredCountStat(destType, pu, statusCode string) {
stats.Default.NewTaggedStat(
"proc_filtered_counts",
stats.CountType,
stats.Tags{
"destName": destType,
"statusCode": statusCode,
"stage": pu,
},
).Increment()
}

func procErrorCountsStat(destType, pu, statusCode string) {
stats.Default.NewTaggedStat(
"proc_error_counts",
stats.CountType,
stats.Tags{
"destName": destType,
"statusCode": statusCode,
"stage": pu,
},
).Increment()
}

func (proc *Handle) getTransformationMetrics(
transformerResponses []transformer.TransformerResponse,
state string,
Expand All @@ -1144,9 +1168,9 @@ func (proc *Handle) getTransformationMetrics(
statusDetailsMap := make(map[string]map[string]*types.StatusDetail)
countMap := make(map[string]int64)
var jobs []*jobsdb.JobT
isDropped := "false"
statFunc := procErrorCountsStat
if state == jobsdb.Filtered.State {
isDropped = "true"
statFunc = procFilteredCountStat
}
for i := range transformerResponses {
failedEvent := &transformerResponses[i]
Expand Down Expand Up @@ -1222,14 +1246,7 @@ func (proc *Handle) getTransformationMetrics(
}
jobs = append(jobs, &newFailedJob)

procErrorStat := stats.Default.NewTaggedStat("proc_error_counts", stats.CountType, stats.Tags{
"destName": commonMetaData.DestinationType,
"statusCode": strconv.Itoa(failedEvent.StatusCode),
"stage": pu,
"isDropped": isDropped,
})

procErrorStat.Increment()
statFunc(commonMetaData.DestinationType, pu, strconv.Itoa(failedEvent.StatusCode))
}

// REPORTING - START
Expand Down

0 comments on commit 8e73bcb

Please sign in to comment.