diff --git a/processor/processor.go b/processor/processor.go index 276915b766..b32c296c7a 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -894,7 +894,7 @@ func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb. } } -func (proc *Handle) getDestTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, inPU, pu string) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) { +func (proc *Handle) getTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, inPU, pu string) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) { successMetrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) statusDetailsMap := make(map[string]map[string]*types.StatusDetail) @@ -1095,24 +1095,83 @@ func (proc *Handle) updateMetricMaps( sd.ViolationCount += int64(veCount) } -func (proc *Handle) getNonSuccessfulMetrics(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, inPU, pu string) *NonSuccessfulTransformationMetrics { +func (proc *Handle) getNonSuccessfulMetrics( + response transformer.Response, + commonMetaData *transformer.Metadata, + eventsByMessageID map[string]types.SingularEventWithReceivedAt, + inPU, pu string, +) *NonSuccessfulTransformationMetrics { m := &NonSuccessfulTransformationMetrics{} - grouped := lo.GroupBy(response.FailedEvents, func(event transformer.TransformerResponse) bool { return event.StatusCode == types.FilterEventCode }) + grouped := lo.GroupBy( + response.FailedEvents, + func(event transformer.TransformerResponse) bool { + return event.StatusCode == types.FilterEventCode + }, + ) filtered, failed := grouped[true], grouped[false] - m.filteredJobs, m.filteredMetrics, m.filteredCountMap = proc.getTransformationMetrics(filtered, jobsdb.Filtered.State, commonMetaData, eventsByMessageID, inPU, pu) - m.failedJobs, m.failedMetrics, m.failedCountMap = proc.getTransformationMetrics(failed, jobsdb.Aborted.State, commonMetaData, eventsByMessageID, inPU, pu) + m.filteredJobs, m.filteredMetrics, m.filteredCountMap = proc.getTransformationMetrics( + filtered, + jobsdb.Filtered.State, + commonMetaData, + eventsByMessageID, + inPU, + pu, + ) + + m.failedJobs, m.failedMetrics, m.failedCountMap = proc.getTransformationMetrics( + failed, + jobsdb.Aborted.State, + commonMetaData, + eventsByMessageID, + inPU, + pu, + ) return m } -func (proc *Handle) getTransformationMetrics(transformerResponses []transformer.TransformerResponse, state string, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, inPU, pu string) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) { +func procFilteredCountStat(destType, pu, statusCode string) { + stats.Default.NewTaggedStat( + "proc_filtered_counts", + stats.CountType, + stats.Tags{ + "destName": destType, + "statusCode": statusCode, + "stage": pu, + }, + ).Increment() +} + +func procErrorCountsStat(destType, pu, statusCode string) { + stats.Default.NewTaggedStat( + "proc_error_counts", + stats.CountType, + stats.Tags{ + "destName": destType, + "statusCode": statusCode, + "stage": pu, + }, + ).Increment() +} + +func (proc *Handle) getTransformationMetrics( + transformerResponses []transformer.TransformerResponse, + state string, + commonMetaData *transformer.Metadata, + eventsByMessageID map[string]types.SingularEventWithReceivedAt, + inPU, pu string, +) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) { metrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) statusDetailsMap := make(map[string]map[string]*types.StatusDetail) countMap := make(map[string]int64) var jobs []*jobsdb.JobT + statFunc := procErrorCountsStat + if state == jobsdb.Filtered.State { + statFunc = procFilteredCountStat + } for i := range transformerResponses { failedEvent := &transformerResponses[i] messages := lo.Map( @@ -1128,17 +1187,25 @@ func (proc *Handle) getTransformationMetrics(transformerResponses []transformer. } for _, message := range messages { - proc.updateMetricMaps(nil, countMap, connectionDetailsMap, statusDetailsMap, failedEvent, state, pu, func() json.RawMessage { - if proc.transientSources.Apply(commonMetaData.SourceID) { - return []byte(`{}`) - } - sampleEvent, err := jsonfast.Marshal(message) - if err != nil { - proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err) - sampleEvent = []byte(`{}`) - } - return sampleEvent - }, + proc.updateMetricMaps( + nil, + countMap, + connectionDetailsMap, + statusDetailsMap, + failedEvent, + state, + pu, + func() json.RawMessage { + if proc.transientSources.Apply(commonMetaData.SourceID) { + return []byte(`{}`) + } + sampleEvent, err := jsonfast.Marshal(message) + if err != nil { + proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err) + sampleEvent = []byte(`{}`) + } + return sampleEvent + }, eventsByMessageID) } @@ -1179,13 +1246,7 @@ func (proc *Handle) getTransformationMetrics(transformerResponses []transformer. } jobs = append(jobs, &newFailedJob) - procErrorStat := stats.Default.NewTaggedStat("proc_error_counts", stats.CountType, stats.Tags{ - "destName": commonMetaData.DestinationType, - "statusCode": strconv.Itoa(failedEvent.StatusCode), - "stage": pu, - }) - - procErrorStat.Increment() + statFunc(commonMetaData.DestinationType, pu, strconv.Itoa(failedEvent.StatusCode)) } // REPORTING - START @@ -2185,7 +2246,7 @@ func (proc *Handle) transformSrcDest( var successMetrics []*types.PUReportedMetric var successCountMap map[string]int64 var successCountMetadataMap map[string]MetricMetadata - eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER) + eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER) nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, inPU, types.USER_TRANSFORMER) droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { @@ -2281,7 +2342,7 @@ func (proc *Handle) transformSrcDest( procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) } procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) - eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER) + eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER) proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform)) // REPORTING - START diff --git a/processor/trackingplan.go b/processor/trackingplan.go index 2e3994ef11..31022b5655 100644 --- a/processor/trackingplan.go +++ b/processor/trackingplan.go @@ -113,7 +113,7 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans trackingPlanEnabledMap[SourceIDT(sourceID)] = true var successMetrics []*types.PUReportedMetric - eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation. + eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation. nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) validationStat.numValidationSuccessEvents.Count(len(eventsToTransform))