Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: filtered and error stats separated in processor #4137

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 87 additions & 26 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@
}
}

func (proc *Handle) getDestTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, inPU, pu string) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) {
func (proc *Handle) getTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, inPU, pu string) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) {
successMetrics := make([]*types.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*types.ConnectionDetails)
statusDetailsMap := make(map[string]map[string]*types.StatusDetail)
Expand Down Expand Up @@ -1095,24 +1095,83 @@
sd.ViolationCount += int64(veCount)
}

func (proc *Handle) getNonSuccessfulMetrics(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, inPU, pu string) *NonSuccessfulTransformationMetrics {
func (proc *Handle) getNonSuccessfulMetrics(
response transformer.Response,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
inPU, pu string,
) *NonSuccessfulTransformationMetrics {
Comment on lines +1098 to +1103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference: I don't find that by splitting a single line, that fits perfectly in the screen, into multiple lines makes the codebase more readable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not fitting on my screen, I'm having to turn my head and sometimes worse: scroll!
To each their own though, can revert if it's becoming a nuisance for everyone

m := &NonSuccessfulTransformationMetrics{}

grouped := lo.GroupBy(response.FailedEvents, func(event transformer.TransformerResponse) bool { return event.StatusCode == types.FilterEventCode })
grouped := lo.GroupBy(
response.FailedEvents,
func(event transformer.TransformerResponse) bool {
return event.StatusCode == types.FilterEventCode
},
)
filtered, failed := grouped[true], grouped[false]

m.filteredJobs, m.filteredMetrics, m.filteredCountMap = proc.getTransformationMetrics(filtered, jobsdb.Filtered.State, commonMetaData, eventsByMessageID, inPU, pu)
m.failedJobs, m.failedMetrics, m.failedCountMap = proc.getTransformationMetrics(failed, jobsdb.Aborted.State, commonMetaData, eventsByMessageID, inPU, pu)
m.filteredJobs, m.filteredMetrics, m.filteredCountMap = proc.getTransformationMetrics(
filtered,
jobsdb.Filtered.State,
commonMetaData,
eventsByMessageID,
inPU,
pu,
)

m.failedJobs, m.failedMetrics, m.failedCountMap = proc.getTransformationMetrics(
failed,
jobsdb.Aborted.State,
commonMetaData,
eventsByMessageID,
inPU,
pu,
)

return m
}

func (proc *Handle) getTransformationMetrics(transformerResponses []transformer.TransformerResponse, state string, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, inPU, pu string) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) {
func procFilteredCountStat(destType, pu, statusCode string) {
stats.Default.NewTaggedStat(
"proc_filtered_counts",
stats.CountType,
stats.Tags{
"destName": destType,
"statusCode": statusCode,
"stage": pu,
},
).Increment()
}

func procErrorCountsStat(destType, pu, statusCode string) {
stats.Default.NewTaggedStat(
"proc_error_counts",
stats.CountType,
stats.Tags{
"destName": destType,
"statusCode": statusCode,
"stage": pu,
},
).Increment()
}

func (proc *Handle) getTransformationMetrics(
transformerResponses []transformer.TransformerResponse,
state string,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
inPU, pu string,
) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) {
metrics := make([]*types.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*types.ConnectionDetails)
statusDetailsMap := make(map[string]map[string]*types.StatusDetail)
countMap := make(map[string]int64)
var jobs []*jobsdb.JobT
statFunc := procErrorCountsStat
if state == jobsdb.Filtered.State {
statFunc = procFilteredCountStat
}
for i := range transformerResponses {
failedEvent := &transformerResponses[i]
messages := lo.Map(
Expand All @@ -1128,17 +1187,25 @@
}

for _, message := range messages {
proc.updateMetricMaps(nil, countMap, connectionDetailsMap, statusDetailsMap, failedEvent, state, pu, func() json.RawMessage {
if proc.transientSources.Apply(commonMetaData.SourceID) {
return []byte(`{}`)
}
sampleEvent, err := jsonfast.Marshal(message)
if err != nil {
proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err)
sampleEvent = []byte(`{}`)
}
return sampleEvent
},
proc.updateMetricMaps(
nil,
countMap,
connectionDetailsMap,
statusDetailsMap,
failedEvent,
state,
pu,
func() json.RawMessage {
if proc.transientSources.Apply(commonMetaData.SourceID) {
return []byte(`{}`)
}

Check warning on line 1201 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1200-L1201

Added lines #L1200 - L1201 were not covered by tests
sampleEvent, err := jsonfast.Marshal(message)
if err != nil {
proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err)
sampleEvent = []byte(`{}`)
}

Check warning on line 1206 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1204-L1206

Added lines #L1204 - L1206 were not covered by tests
return sampleEvent
},
eventsByMessageID)
}

Expand Down Expand Up @@ -1179,13 +1246,7 @@
}
jobs = append(jobs, &newFailedJob)

procErrorStat := stats.Default.NewTaggedStat("proc_error_counts", stats.CountType, stats.Tags{
"destName": commonMetaData.DestinationType,
"statusCode": strconv.Itoa(failedEvent.StatusCode),
"stage": pu,
})

procErrorStat.Increment()
statFunc(commonMetaData.DestinationType, pu, strconv.Itoa(failedEvent.StatusCode))
}

// REPORTING - START
Expand Down Expand Up @@ -2185,7 +2246,7 @@
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER)
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, inPU, types.USER_TRANSFORMER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
Expand Down Expand Up @@ -2281,7 +2342,7 @@
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

// REPORTING - START
Expand Down
2 changes: 1 addition & 1 deletion processor/trackingplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans
trackingPlanEnabledMap[SourceIDT(sourceID)] = true

var successMetrics []*types.PUReportedMetric
eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR)

validationStat.numValidationSuccessEvents.Count(len(eventsToTransform))
Expand Down
Loading