diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index c77b19052a..7500e2d64a 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -23,7 +23,7 @@ import ( "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types" - "github.com/thoas/go-funk" + "github.com/samber/lo" "golang.org/x/sync/errgroup" ) @@ -217,7 +217,7 @@ func (r *HandleT) getReports(currentMs int64, clientName string) (reports []*typ return nil, 0 } - sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, in_pu, pu, reported_at, status, count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64) + sqlStatement = fmt.Sprintf(`SELECT workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, transformation_id, transformation_version_id, tracking_plan_id, tracking_plan_version, in_pu, pu, reported_at, status, count, violation_count, terminal_state, initial_state, status_code, sample_response, sample_event, event_name, event_type, error_type FROM %s WHERE reported_at = %d`, ReportsTable, queryMin.Int64) var rows *sql.Rows queryStart = time.Now() rows, err = dbHandle.Query(sqlStatement) @@ -230,7 +230,30 @@ func (r *HandleT) getReports(currentMs int64, clientName string) (reports []*typ var metricReports []*types.ReportByStatus for rows.Next() { metricReport := types.ReportByStatus{StatusDetail: &types.StatusDetail{}} - err = rows.Scan(&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID, &metricReport.ConnectionDetails.SourceDefinitionId, &metricReport.ConnectionDetails.SourceCategory, &metricReport.ConnectionDetails.SourceID, &metricReport.ConnectionDetails.DestinationDefinitionId, &metricReport.ConnectionDetails.DestinationID, &metricReport.ConnectionDetails.SourceTaskRunID, &metricReport.ConnectionDetails.SourceJobID, &metricReport.ConnectionDetails.SourceJobRunID, &metricReport.PUDetails.InPU, &metricReport.PUDetails.PU, &metricReport.ReportedAt, &metricReport.StatusDetail.Status, &metricReport.StatusDetail.Count, &metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU, &metricReport.StatusDetail.StatusCode, &metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent, &metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType) + err = rows.Scan( + &metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID, + &metricReport.ConnectionDetails.SourceDefinitionId, + &metricReport.ConnectionDetails.SourceCategory, + &metricReport.ConnectionDetails.SourceID, + &metricReport.ConnectionDetails.DestinationDefinitionId, + &metricReport.ConnectionDetails.DestinationID, + &metricReport.ConnectionDetails.SourceTaskRunID, + &metricReport.ConnectionDetails.SourceJobID, + &metricReport.ConnectionDetails.SourceJobRunID, + &metricReport.ConnectionDetails.TransformationID, + &metricReport.ConnectionDetails.TransformationVersionID, + &metricReport.ConnectionDetails.TrackingPlanID, + &metricReport.ConnectionDetails.TrackingPlanVersion, + &metricReport.PUDetails.InPU, &metricReport.PUDetails.PU, + &metricReport.ReportedAt, + &metricReport.StatusDetail.Status, + &metricReport.StatusDetail.Count, &metricReport.StatusDetail.ViolationCount, + &metricReport.PUDetails.TerminalPU, &metricReport.PUDetails.InitialPU, + &metricReport.StatusDetail.StatusCode, + &metricReport.StatusDetail.SampleResponse, &metricReport.StatusDetail.SampleEvent, + &metricReport.StatusDetail.EventName, &metricReport.StatusDetail.EventType, + &metricReport.StatusDetail.ErrorType, + ) if err != nil { panic(err) } @@ -244,7 +267,22 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M metricsByGroup := map[string]*types.Metric{} reportIdentifier := func(report *types.ReportByStatus) string { - groupingIdentifiers := []string{report.InstanceDetails.WorkspaceID, report.InstanceDetails.Namespace, report.InstanceDetails.InstanceID, report.ConnectionDetails.SourceID, report.ConnectionDetails.DestinationID, report.ConnectionDetails.SourceTaskRunID, report.ConnectionDetails.SourceJobID, report.ConnectionDetails.SourceJobRunID, report.PUDetails.InPU, report.PUDetails.PU, report.StatusDetail.Status, fmt.Sprint(report.StatusDetail.StatusCode), report.StatusDetail.EventName, report.StatusDetail.EventType} + groupingIdentifiers := []string{ + report.InstanceDetails.WorkspaceID, report.InstanceDetails.Namespace, report.InstanceDetails.InstanceID, + report.ConnectionDetails.SourceID, + report.ConnectionDetails.DestinationID, + report.ConnectionDetails.SourceTaskRunID, + report.ConnectionDetails.SourceJobID, + report.ConnectionDetails.SourceJobRunID, + report.ConnectionDetails.TransformationID, + report.ConnectionDetails.TransformationVersionID, + report.ConnectionDetails.TrackingPlanID, + strconv.Itoa(report.ConnectionDetails.TrackingPlanVersion), + report.PUDetails.InPU, report.PUDetails.PU, + report.StatusDetail.Status, + strconv.Itoa(report.StatusDetail.StatusCode), + report.StatusDetail.EventName, report.StatusDetail.EventType, + } return strings.Join(groupingIdentifiers, `::`) } @@ -266,6 +304,10 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M SourceTaskRunID: report.SourceTaskRunID, SourceJobID: report.SourceJobID, SourceJobRunID: report.SourceJobRunID, + TransformationID: report.TransformationID, + TransformationVersionID: report.TransformationVersionID, + TrackingPlanID: report.TrackingPlanID, + TrackingPlanVersion: report.TrackingPlanVersion, }, PUDetails: types.PUDetails{ InPU: report.InPU, @@ -278,23 +320,26 @@ func (*HandleT) getAggregatedReports(reports []*types.ReportByStatus) []*types.M }, } } - statusDetailInterface := funk.Find(metricsByGroup[identifier].StatusDetails, func(i *types.StatusDetail) bool { - return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode + statusDetailInterface, found := lo.Find(metricsByGroup[identifier].StatusDetails, func(i *types.StatusDetail) bool { + return i.Status == report.StatusDetail.Status && i.StatusCode == report.StatusDetail.StatusCode && i.ErrorType == report.StatusDetail.ErrorType }) - if statusDetailInterface == nil { + if !found { metricsByGroup[identifier].StatusDetails = append(metricsByGroup[identifier].StatusDetails, &types.StatusDetail{ Status: report.StatusDetail.Status, StatusCode: report.StatusDetail.StatusCode, Count: report.StatusDetail.Count, + ViolationCount: report.StatusDetail.ViolationCount, SampleResponse: report.StatusDetail.SampleResponse, SampleEvent: report.StatusDetail.SampleEvent, EventName: report.StatusDetail.EventName, EventType: report.StatusDetail.EventType, + ErrorType: report.StatusDetail.ErrorType, }) continue } - statusDetail := statusDetailInterface.(*types.StatusDetail) + statusDetail := statusDetailInterface statusDetail.Count += report.StatusDetail.Count + statusDetail.ViolationCount += report.StatusDetail.ViolationCount statusDetail.SampleResponse = report.StatusDetail.SampleResponse statusDetail.SampleEvent = report.StatusDetail.SampleEvent } @@ -484,7 +529,30 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) { return } - stmt, err := txn.Prepare(pq.CopyIn(ReportsTable, "workspace_id", "namespace", "instance_id", "source_definition_id", "source_category", "source_id", "destination_definition_id", "destination_id", "source_task_run_id", "source_job_id", "source_job_run_id", "in_pu", "pu", "reported_at", "status", "count", "terminal_state", "initial_state", "status_code", "sample_response", "sample_event", "event_name", "event_type")) + stmt, err := txn.Prepare(pq.CopyIn(ReportsTable, + "workspace_id", "namespace", "instance_id", + "source_definition_id", + "source_category", + "source_id", + "destination_definition_id", + "destination_id", + "source_task_run_id", + "source_job_id", + "source_job_run_id", + "transformation_id", + "transformation_version_id", + "tracking_plan_id", + "tracking_plan_version", + "in_pu", "pu", + "reported_at", + "status", + "count", "violation_count", + "terminal_state", "initial_state", + "status_code", + "sample_response", "sample_event", + "event_name", "event_type", + "error_type", + )) if err != nil { _ = txn.Rollback() panic(err) @@ -499,7 +567,29 @@ func (r *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) { metric = transformMetricForPII(metric, getPIIColumnsToExclude()) } - _, err = stmt.Exec(workspaceID, r.namespace, r.instanceID, metric.ConnectionDetails.SourceDefinitionId, metric.ConnectionDetails.SourceCategory, metric.ConnectionDetails.SourceID, metric.ConnectionDetails.DestinationDefinitionId, metric.ConnectionDetails.DestinationID, metric.ConnectionDetails.SourceTaskRunID, metric.ConnectionDetails.SourceJobID, metric.ConnectionDetails.SourceJobRunID, metric.PUDetails.InPU, metric.PUDetails.PU, reportedAt, metric.StatusDetail.Status, metric.StatusDetail.Count, metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, metric.StatusDetail.StatusCode, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, metric.StatusDetail.EventType) + _, err = stmt.Exec( + workspaceID, r.namespace, r.instanceID, + metric.ConnectionDetails.SourceDefinitionId, + metric.ConnectionDetails.SourceCategory, + metric.ConnectionDetails.SourceID, + metric.ConnectionDetails.DestinationDefinitionId, + metric.ConnectionDetails.DestinationID, + metric.ConnectionDetails.SourceTaskRunID, + metric.ConnectionDetails.SourceJobID, + metric.ConnectionDetails.SourceJobRunID, + metric.ConnectionDetails.TransformationID, + metric.ConnectionDetails.TransformationVersionID, + metric.ConnectionDetails.TrackingPlanID, + metric.ConnectionDetails.TrackingPlanVersion, + metric.PUDetails.InPU, metric.PUDetails.PU, + reportedAt, + metric.StatusDetail.Status, + metric.StatusDetail.Count, metric.StatusDetail.ViolationCount, + metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, + metric.StatusDetail.StatusCode, + metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), + metric.StatusDetail.EventName, metric.StatusDetail.EventType, + metric.StatusDetail.ErrorType) if err != nil { panic(err) } diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 988fa10eb7..3435edfa11 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -8,6 +8,7 @@ import ( "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/rudderlabs/rudder-go-kit/logger" backendconfig "github.com/rudderlabs/rudder-server/backend-config" mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/backend-config" "github.com/rudderlabs/rudder-server/utils/pubsub" @@ -72,6 +73,138 @@ var _ = Describe("Reporting", func() { assertReportMetric(expectedResponse, transformedMetric) }) }) + + Context("getAggregatedReports Tests", func() { + inputReports := []*types.ReportByStatus{ + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 3, + ViolationCount: 5, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "", + }, + }, + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 2, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 3, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + } + + expectedResponse := []*types.Metric{ + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690 * 60 * 1000, + }, + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 3, + ViolationCount: 5, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "", + }, + { + Status: "some-status", + Count: 5, + ViolationCount: 20, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + }, + } + + reportHandle := NewFromEnvConfig(logger.NOP) + + aggregatedMetrics := reportHandle.getAggregatedReports(inputReports) + Expect(aggregatedMetrics).To(Equal(expectedResponse)) + }) }) func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) { diff --git a/processor/processor.go b/processor/processor.go index f28c446ef4..2ebf894ba5 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -222,6 +222,10 @@ type MetricMetadata struct { sourceDefinitionID string destinationDefinitionID string sourceCategory string + transformationID string + transformationVersionID string + trackingPlanID string + trackingPlanVersion int } type ( @@ -919,17 +923,40 @@ func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb. } } -func (proc *Handle) getDestTransformerEvents(response transformer.ResponseT, commonMetaData *transformer.MetadataT, destination *backendconfig.DestinationT, stage string, trackingPlanEnabled, userTransformationEnabled bool) ([]transformer.TransformerEventT, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) { +func (proc *Handle) getDestTransformerEvents(response transformer.ResponseT, commonMetaData *transformer.MetadataT, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, stage string, trackingPlanEnabled, userTransformationEnabled bool) ([]transformer.TransformerEventT, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) { successMetrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) - statusDetailsMap := make(map[string]*types.StatusDetail) + statusDetailsMap := make(map[string]map[string]*types.StatusDetail) successCountMap := make(map[string]int64) successCountMetadataMap := make(map[string]MetricMetadata) var eventsToTransform []transformer.TransformerEventT for i := range response.Events { // Update metrics maps userTransformedEvent := &response.Events[i] - proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, func() json.RawMessage { return []byte(`{}`) }) + var messages []types.SingularEventT + if len(userTransformedEvent.Metadata.MessageIDs) > 0 { + messages = lo.Map(userTransformedEvent.Metadata.MessageIDs, func(msgID string, _ int) types.SingularEventT { return eventsByMessageID[msgID].SingularEvent }) + } else { + messages = []types.SingularEventT{eventsByMessageID[userTransformedEvent.Metadata.MessageID].SingularEvent} + } + + for _, message := range messages { + proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, stage, func() json.RawMessage { + if stage != transformer.TrackingPlanValidationStage { + return []byte(`{}`) + } + if proc.transientSources.Apply(commonMetaData.SourceID) { + return []byte(`{}`) + } + + sampleEvent, err := jsonfast.Marshal(message) + if err != nil { + proc.logger.Errorf(`[Processor: getDestTransformerEvents] Failed to unmarshal first element in transformed events: %v`, err) + sampleEvent = []byte(`{}`) + } + return sampleEvent + }) + } eventMetadata := commonMetaData eventMetadata.MessageIDs = userTransformedEvent.Metadata.MessageIDs @@ -983,12 +1010,14 @@ func (proc *Handle) getDestTransformerEvents(response transformer.ResponseT, com } for k, cd := range connectionDetailsMap { - m := &types.PUReportedMetric{ - ConnectionDetails: *cd, - PUDetails: *types.CreatePUDetails(inPU, pu, false, false), - StatusDetail: statusDetailsMap[k], + for _, sd := range statusDetailsMap[k] { + m := &types.PUReportedMetric{ + ConnectionDetails: *cd, + PUDetails: *types.CreatePUDetails(inPU, pu, false, false), + StatusDetail: sd, + } + successMetrics = append(successMetrics, m) } - successMetrics = append(successMetrics, m) } } // REPORTING - END @@ -1000,9 +1029,9 @@ func (proc *Handle) updateMetricMaps( countMetadataMap map[string]MetricMetadata, countMap map[string]int64, connectionDetailsMap map[string]*types.ConnectionDetails, - statusDetailsMap map[string]*types.StatusDetail, + statusDetailsMap map[string]map[string]*types.StatusDetail, event *transformer.TransformerResponseT, - status string, + status, stage string, payload func() json.RawMessage, ) { if proc.isReportingEnabled() { @@ -1035,14 +1064,22 @@ func (proc *Handle) updateMetricMaps( sourceDefinitionID: event.Metadata.SourceDefinitionID, destinationDefinitionID: event.Metadata.DestinationDefinitionID, sourceCategory: event.Metadata.SourceCategory, + transformationID: event.Metadata.TransformationID, + transformationVersionID: event.Metadata.TransformationVersionID, + trackingPlanID: event.Metadata.TrackingPlanId, + trackingPlanVersion: event.Metadata.TrackingPlanVersion, } } } - key := fmt.Sprintf("%s:%s:%s:%s:%d:%s:%s", + key := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%s:%d:%s:%s", event.Metadata.SourceID, event.Metadata.DestinationID, event.Metadata.SourceJobRunID, + event.Metadata.TransformationID, + event.Metadata.TransformationVersionID, + event.Metadata.TrackingPlanId, + event.Metadata.TrackingPlanVersion, status, event.StatusCode, eventName, eventType, ) @@ -1058,34 +1095,69 @@ func (proc *Handle) updateMetricMaps( event.Metadata.SourceDefinitionID, event.Metadata.DestinationDefinitionID, event.Metadata.SourceCategory, + event.Metadata.TransformationID, + event.Metadata.TransformationVersionID, + event.Metadata.TrackingPlanId, + event.Metadata.TrackingPlanVersion, ) connectionDetailsMap[key] = cd } - sd, ok := statusDetailsMap[key] + + if _, ok := statusDetailsMap[key]; !ok { + statusDetailsMap[key] = make(map[string]*types.StatusDetail) + } + // create status details for each validation error + // single event can have multiple validation errors of same type + veCount := len(event.ValidationErrors) + if stage == transformer.TrackingPlanValidationStage && status == jobsdb.Succeeded.State { + if veCount > 0 { + status = types.SUCCEEDED_WITH_VIOLATIONS + } else { + status = types.SUCCEEDED_WITHOUT_VIOLATIONS + } + } + sdkeySet := map[string]struct{}{} + for _, ve := range event.ValidationErrors { + sdkey := fmt.Sprintf("%s:%d:%s:%s:%s", + status, event.StatusCode, eventName, eventType, ve.Type) + sdkeySet[sdkey] = struct{}{} + + sd, ok := statusDetailsMap[key][sdkey] + if !ok { + sd = types.CreateStatusDetail(status, 0, 0, event.StatusCode, event.Error, payload(), eventName, eventType, ve.Type) + statusDetailsMap[key][sdkey] = sd + } + sd.ViolationCount++ + } + for k := range sdkeySet { + statusDetailsMap[key][k].Count++ + } + + // create status details for a whole event + sdkey := fmt.Sprintf("%s:%d:%s:%s:%s", status, event.StatusCode, eventName, eventType, "") + sd, ok := statusDetailsMap[key][sdkey] if !ok { - sd = types.CreateStatusDetail(status, 0, event.StatusCode, event.Error, payload(), eventName, eventType) - statusDetailsMap[key] = sd + sd = types.CreateStatusDetail(status, 0, 0, event.StatusCode, event.Error, payload(), eventName, eventType, "") + statusDetailsMap[key][sdkey] = sd } sd.Count++ + sd.ViolationCount += int64(veCount) } } func (proc *Handle) getFailedEventJobs(response transformer.ResponseT, commonMetaData *transformer.MetadataT, eventsByMessageID map[string]types.SingularEventWithReceivedAt, stage string, transformationEnabled, trackingPlanEnabled bool) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) { failedMetrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) - statusDetailsMap := make(map[string]*types.StatusDetail) + statusDetailsMap := make(map[string]map[string]*types.StatusDetail) failedCountMap := make(map[string]int64) var failedEventsToStore []*jobsdb.JobT for i := range response.FailedEvents { failedEvent := &response.FailedEvents[i] var messages []types.SingularEventT if len(failedEvent.Metadata.MessageIDs) > 0 { - messageIds := failedEvent.Metadata.MessageIDs - for _, msgID := range messageIds { - messages = append(messages, eventsByMessageID[msgID].SingularEvent) - } + messages = lo.Map(failedEvent.Metadata.MessageIDs, func(msgID string, _ int) types.SingularEventT { return eventsByMessageID[msgID].SingularEvent }) } else { - messages = append(messages, eventsByMessageID[failedEvent.Metadata.MessageID].SingularEvent) + messages = []types.SingularEventT{eventsByMessageID[failedEvent.Metadata.MessageID].SingularEvent} } payload, err := jsonfast.Marshal(messages) if err != nil { @@ -1094,12 +1166,13 @@ func (proc *Handle) getFailedEventJobs(response transformer.ResponseT, commonMet } for _, message := range messages { - proc.updateMetricMaps(nil, failedCountMap, connectionDetailsMap, statusDetailsMap, failedEvent, jobsdb.Aborted.State, func() json.RawMessage { + proc.updateMetricMaps(nil, failedCountMap, connectionDetailsMap, statusDetailsMap, failedEvent, jobsdb.Aborted.State, stage, func() json.RawMessage { + if proc.transientSources.Apply(commonMetaData.SourceID) { + return []byte(`{}`) + } sampleEvent, err := jsonfast.Marshal(message) if err != nil { proc.logger.Errorf(`[Processor: getFailedEventJobs] Failed to unmarshal first element in failed events: %v`, err) - } - if err != nil || proc.transientSources.Apply(commonMetaData.SourceID) { sampleEvent = []byte(`{}`) } return sampleEvent @@ -1184,12 +1257,14 @@ func (proc *Handle) getFailedEventJobs(response transformer.ResponseT, commonMet pu = types.TRACKINGPLAN_VALIDATOR } for k, cd := range connectionDetailsMap { - m := &types.PUReportedMetric{ - ConnectionDetails: *cd, - PUDetails: *types.CreatePUDetails(inPU, pu, false, false), - StatusDetail: statusDetailsMap[k], + for _, sd := range statusDetailsMap[k] { + m := &types.PUReportedMetric{ + ConnectionDetails: *cd, + PUDetails: *types.CreatePUDetails(inPU, pu, false, false), + StatusDetail: sd, + } + failedMetrics = append(failedMetrics, m) } - failedMetrics = append(failedMetrics, m) } } // REPORTING - END @@ -1255,9 +1330,9 @@ func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadat if diff != 0 { metricMetadata := inCountMetadataMap[key] metric := &types.PUReportedMetric{ - ConnectionDetails: *types.CreateConnectionDetail(metricMetadata.sourceID, metricMetadata.destinationID, metricMetadata.sourceTaskRunID, metricMetadata.sourceJobID, metricMetadata.sourceJobRunID, metricMetadata.sourceDefinitionID, metricMetadata.destinationDefinitionID, metricMetadata.sourceCategory), + ConnectionDetails: *types.CreateConnectionDetail(metricMetadata.sourceID, metricMetadata.destinationID, metricMetadata.sourceTaskRunID, metricMetadata.sourceJobID, metricMetadata.sourceJobRunID, metricMetadata.sourceDefinitionID, metricMetadata.destinationDefinitionID, metricMetadata.sourceCategory, metricMetadata.transformationID, metricMetadata.transformationVersionID, metricMetadata.trackingPlanID, metricMetadata.trackingPlanVersion), PUDetails: *types.CreatePUDetails(inPU, pu, false, false), - StatusDetail: types.CreateStatusDetail(types.DiffStatus, diff, 0, "", []byte(`{}`), eventName, eventType), + StatusDetail: types.CreateStatusDetail(types.DiffStatus, diff, 0, 0, "", []byte(`{}`), eventName, eventType, ""), } diffMetrics = append(diffMetrics, metric) } @@ -1307,10 +1382,10 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob, parsedE inCountMap := make(map[string]int64) inCountMetadataMap := make(map[string]MetricMetadata) connectionDetailsMap := make(map[string]*types.ConnectionDetails) - statusDetailsMap := make(map[string]*types.StatusDetail) + statusDetailsMap := make(map[string]map[string]*types.StatusDetail) outCountMap := make(map[string]int64) // destinations enabled - destFilterStatusDetailMap := make(map[string]*types.StatusDetail) + destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail) for idx, batchEvent := range jobList { @@ -1412,6 +1487,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob, parsedE statusDetailsMap, event, jobsdb.Succeeded.State, + types.GATEWAY, func() json.RawMessage { if payload := payloadFunc(); payload != nil { return payload @@ -1447,7 +1523,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob, parsedE groupedEventsByWriteKey[WriteKeyT(writeKey)] = append(groupedEventsByWriteKey[WriteKeyT(writeKey)], shallowEventCopy) if proc.isReportingEnabled() { - proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, event, jobsdb.Succeeded.State, func() json.RawMessage { return []byte(`{}`) }) + proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, event, jobsdb.Succeeded.State, types.DESTINATION_FILTER, func() json.RawMessage { return []byte(`{}`) }) } } } @@ -1492,18 +1568,20 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob, parsedE if proc.isReportingEnabled() { types.AssertSameKeys(connectionDetailsMap, statusDetailsMap) for k, cd := range connectionDetailsMap { - m := &types.PUReportedMetric{ - ConnectionDetails: *cd, - PUDetails: *types.CreatePUDetails("", types.GATEWAY, false, true), - StatusDetail: statusDetailsMap[k], + for _, sd := range statusDetailsMap[k] { + m := &types.PUReportedMetric{ + ConnectionDetails: *cd, + PUDetails: *types.CreatePUDetails("", types.GATEWAY, false, true), + StatusDetail: sd, + } + reportMetrics = append(reportMetrics, m) } - reportMetrics = append(reportMetrics, m) - if _, ok := destFilterStatusDetailMap[k]; ok { + for _, dsd := range destFilterStatusDetailMap[k] { destFilterMetric := &types.PUReportedMetric{ ConnectionDetails: *cd, PUDetails: *types.CreatePUDetails(types.GATEWAY, types.DESTINATION_FILTER, false, false), - StatusDetail: destFilterStatusDetailMap[k], + StatusDetail: dsd, } reportMetrics = append(reportMetrics, destFilterMetric) } @@ -1572,6 +1650,10 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob, parsedE // At the TP flow we are not having destination information, so adding it here. shallowEventCopy.Metadata.DestinationID = destination.ID shallowEventCopy.Metadata.DestinationType = destination.DestinationDefinition.Name + if len(destination.Transformations) > 0 { + shallowEventCopy.Metadata.TransformationID = destination.Transformations[0].ID + shallowEventCopy.Metadata.TransformationVersionID = destination.Transformations[0].VersionID + } filterConfig(&shallowEventCopy) metadata := shallowEventCopy.Metadata srcAndDestKey := getKeyFromSourceAndDest(metadata.SourceID, metadata.DestinationID) @@ -2002,7 +2084,20 @@ func (proc *Handle) transformSrcDest( inCountMap[key] = 0 } if _, ok := inCountMetadataMap[key]; !ok { - inCountMetadataMap[key] = MetricMetadata{sourceID: event.Metadata.SourceID, destinationID: event.Metadata.DestinationID, sourceTaskRunID: event.Metadata.SourceTaskRunID, sourceJobID: event.Metadata.SourceJobID, sourceJobRunID: event.Metadata.SourceJobRunID, sourceDefinitionID: event.Metadata.SourceDefinitionID, destinationDefinitionID: event.Metadata.DestinationDefinitionID, sourceCategory: event.Metadata.SourceCategory} + inCountMetadataMap[key] = MetricMetadata{ + sourceID: event.Metadata.SourceID, + destinationID: event.Metadata.DestinationID, + sourceTaskRunID: event.Metadata.SourceTaskRunID, + sourceJobID: event.Metadata.SourceJobID, + sourceJobRunID: event.Metadata.SourceJobRunID, + sourceDefinitionID: event.Metadata.SourceDefinitionID, + destinationDefinitionID: event.Metadata.DestinationDefinitionID, + sourceCategory: event.Metadata.SourceCategory, + transformationID: event.Metadata.TransformationID, + transformationVersionID: event.Metadata.TransformationVersionID, + trackingPlanID: event.Metadata.TrackingPlanId, + trackingPlanVersion: event.Metadata.TrackingPlanVersion, + } } inCountMap[key] = inCountMap[key] + 1 } @@ -2027,7 +2122,7 @@ func (proc *Handle) transformSrcDest( var successMetrics []*types.PUReportedMetric var successCountMap map[string]int64 var successCountMetadataMap map[string]MetricMetadata - eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled) + eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled) failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled) proc.saveFailedJobs(failedJobs) if _, ok := procErrorJobsByDestID[destID]; !ok { @@ -2094,7 +2189,7 @@ func (proc *Handle) transformSrcDest( var successMetrics []*types.PUReportedMetric var successCountMap map[string]int64 var successCountMetadataMap map[string]MetricMetadata - eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled) + eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled) failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled) proc.saveFailedJobs(failedJobs) proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform)) @@ -2177,21 +2272,23 @@ func (proc *Handle) transformSrcDest( if proc.isReportingEnabled() { successMetrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) - statusDetailsMap := make(map[string]*types.StatusDetail) + statusDetailsMap := make(map[string]map[string]*types.StatusDetail) successCountMap := make(map[string]int64) for i := range response.Events { // Update metrics maps - proc.updateMetricMaps(nil, successCountMap, connectionDetailsMap, statusDetailsMap, &response.Events[i], jobsdb.Succeeded.State, func() json.RawMessage { return []byte(`{}`) }) + proc.updateMetricMaps(nil, successCountMap, connectionDetailsMap, statusDetailsMap, &response.Events[i], jobsdb.Succeeded.State, types.DEST_TRANSFORMER, func() json.RawMessage { return []byte(`{}`) }) } types.AssertSameKeys(connectionDetailsMap, statusDetailsMap) for k, cd := range connectionDetailsMap { - m := &types.PUReportedMetric{ - ConnectionDetails: *cd, - PUDetails: *types.CreatePUDetails(types.EVENT_FILTER, types.DEST_TRANSFORMER, false, false), - StatusDetail: statusDetailsMap[k], + for _, sd := range statusDetailsMap[k] { + m := &types.PUReportedMetric{ + ConnectionDetails: *cd, + PUDetails: *types.CreatePUDetails(types.EVENT_FILTER, types.DEST_TRANSFORMER, false, false), + StatusDetail: sd, + } + successMetrics = append(successMetrics, m) } - successMetrics = append(successMetrics, m) } diffMetrics := getDiffMetrics(types.EVENT_FILTER, types.DEST_TRANSFORMER, inCountMetadataMap, inCountMap, successCountMap, failedCountMap) diff --git a/processor/processor_test.go b/processor/processor_test.go index 0dac24fddf..1f285b198b 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -1823,6 +1823,89 @@ var _ = Describe("Static Function Tests", func() { }) }) + Context("updateMetricMaps Tests", func() { + It("Should update metric maps", func() { + proc := NewHandle(nil) + proc.reportingEnabled = true + proc.reporting = &mockReportingTypes.MockReportingI{} + + inputEvent := &transformer.TransformerResponseT{ + Metadata: transformer.MetadataT{ + SourceID: "source-id-1", + DestinationID: "destination-id-1", + TransformationID: "transformation-id-1", + TrackingPlanId: "tracking-plan-id-1", + EventName: "event-name-1", + EventType: "event-type-1", + }, + StatusCode: 200, + Error: "", + ValidationErrors: []transformer.ValidationErrorT{ + { + Type: "type-1", + Message: "message-1", + }, + { + Type: "type-1", + Message: "message-2", + }, + { + Type: "type-2", + Message: "message-1", + }, + }, + } + expectedMetricMetadata := MetricMetadata{ + sourceID: inputEvent.Metadata.SourceID, + destinationID: inputEvent.Metadata.DestinationID, + sourceJobRunID: inputEvent.Metadata.SourceJobRunID, + sourceJobID: inputEvent.Metadata.SourceJobID, + sourceTaskRunID: inputEvent.Metadata.SourceTaskRunID, + sourceDefinitionID: inputEvent.Metadata.SourceDefinitionID, + destinationDefinitionID: inputEvent.Metadata.DestinationDefinitionID, + sourceCategory: inputEvent.Metadata.SourceCategory, + transformationID: inputEvent.Metadata.TransformationID, + transformationVersionID: inputEvent.Metadata.TransformationVersionID, + trackingPlanID: inputEvent.Metadata.TrackingPlanId, + trackingPlanVersion: inputEvent.Metadata.TrackingPlanVersion, + } + expectedConnectionDetails := &types.ConnectionDetails{ + SourceID: inputEvent.Metadata.SourceID, + DestinationID: inputEvent.Metadata.DestinationID, + SourceJobRunID: inputEvent.Metadata.SourceJobRunID, + SourceJobID: inputEvent.Metadata.SourceJobID, + SourceTaskRunID: inputEvent.Metadata.SourceTaskRunID, + SourceDefinitionId: inputEvent.Metadata.SourceDefinitionID, + DestinationDefinitionId: inputEvent.Metadata.DestinationDefinitionID, + SourceCategory: inputEvent.Metadata.SourceCategory, + TransformationID: inputEvent.Metadata.TransformationID, + TransformationVersionID: inputEvent.Metadata.TransformationVersionID, + TrackingPlanID: inputEvent.Metadata.TrackingPlanId, + TrackingPlanVersion: inputEvent.Metadata.TrackingPlanVersion, + } + connectionDetailsMap := make(map[string]*types.ConnectionDetails) + statusDetailsMap := make(map[string]map[string]*types.StatusDetail) + countMap := make(map[string]int64) + countMetadataMap := make(map[string]MetricMetadata) + // update metric maps + proc.updateMetricMaps(countMetadataMap, countMap, connectionDetailsMap, statusDetailsMap, inputEvent, jobsdb.Succeeded.State, transformer.TrackingPlanValidationStage, func() json.RawMessage { return []byte(`{}`) }) + + Expect(len(countMetadataMap)).To(Equal(1)) + Expect(len(countMap)).To(Equal(1)) + for k := range countMetadataMap { + Expect(countMetadataMap[k]).To(Equal(expectedMetricMetadata)) + Expect(countMap[k]).To(Equal(int64(1))) + } + + Expect(len(connectionDetailsMap)).To(Equal(1)) + Expect(len(statusDetailsMap)).To(Equal(1)) + for k := range connectionDetailsMap { + Expect(connectionDetailsMap[k]).To(Equal(expectedConnectionDetails)) + Expect(len(statusDetailsMap[k])).To(Equal(3)) // count of distinct error types: "type-1", "type-2", "" + } + }) + }) + Context("ConvertToTransformerResponse Tests", func() { It("Should filter out unsupported message types", func() { destinationConfig := backendconfig.DestinationT{ diff --git a/processor/trackingplan.go b/processor/trackingplan.go index 607103f64d..e42f7fe351 100644 --- a/processor/trackingplan.go +++ b/processor/trackingplan.go @@ -102,7 +102,7 @@ func (proc *Handle) validateEvents(groupedEventsByWriteKey map[WriteKeyT][]trans trackingPlanEnabledMap[SourceIDT(sourceID)] = true var successMetrics []*types.PUReportedMetric - eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, destination, transformer.TrackingPlanValidationStage, true, false) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation. + eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.TrackingPlanValidationStage, true, false) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation. failedJobs, failedMetrics, _ := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.TrackingPlanValidationStage, false, true) validationStat.numValidationSuccessEvents.Count(len(eventsToTransform)) diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index 2f53b6b6aa..16ffdd0f70 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -68,6 +68,8 @@ type MetadataT struct { EventType string `json:"eventType"` SourceDefinitionID string `json:"sourceDefinitionId"` DestinationDefinitionID string `json:"destinationDefinitionId"` + TransformationID string `json:"transformationId"` + TransformationVersionID string `json:"transformationVersionId"` } type TransformerEventT struct { @@ -204,7 +206,7 @@ func (trans *HandleT) Transform(ctx context.Context, clientEvents []TransformerE return ResponseT{} } - sTags := statsTags(clientEvents[0]) + sTags := statsTags(&clientEvents[0]) batches := lo.Chunk(clientEvents, batchSize) @@ -271,7 +273,7 @@ func (*HandleT) requestTime(s stats.Tags, d time.Duration) { stats.Default.NewTaggedStat("processor.transformer_request_time", stats.TimerType, s).SendTiming(d) } -func statsTags(event TransformerEventT) stats.Tags { +func statsTags(event *TransformerEventT) stats.Tags { return stats.Tags{ "dest_type": event.Destination.DestinationDefinition.Name, "dest_id": event.Destination.ID, @@ -309,7 +311,7 @@ func (trans *HandleT) request(ctx context.Context, url string, data []Transforme // endless backoff loop, only nil error or panics inside _ = backoff.RetryNotify( func() error { - respData, statusCode = trans.doPost(ctx, rawJSON, url, statsTags(data[0])) + respData, statusCode = trans.doPost(ctx, rawJSON, url, statsTags(&data[0])) if statusCode == StatusCPDown { trans.cpDownGauge.Gauge(1) return fmt.Errorf("control plane not reachable") diff --git a/router/batchrouter/batchrouter.go b/router/batchrouter/batchrouter.go index 136e80ffcd..030a639bf4 100644 --- a/router/batchrouter/batchrouter.go +++ b/router/batchrouter/batchrouter.go @@ -1256,7 +1256,7 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc workspaceID := job.WorkspaceId key := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s", parameters.SourceID, parameters.DestinationID, parameters.SourceJobRunID, jobState, strconv.Itoa(errorCode), parameters.EventName, parameters.EventType) if _, ok := connectionDetailsMap[key]; !ok { - cd = types.CreateConnectionDetail(parameters.SourceID, parameters.DestinationID, parameters.SourceTaskRunID, parameters.SourceJobID, parameters.SourceJobRunID, parameters.SourceDefinitionID, parameters.DestinationDefinitionID, parameters.SourceCategory) + cd = types.CreateConnectionDetail(parameters.SourceID, parameters.DestinationID, parameters.SourceTaskRunID, parameters.SourceJobID, parameters.SourceJobRunID, parameters.SourceDefinitionID, parameters.DestinationDefinitionID, parameters.SourceCategory, "", "", "", 0) connectionDetailsMap[key] = cd transformedAtMap[key] = parameters.TransformAt } @@ -1266,7 +1266,7 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc if brt.transientSources.Apply(parameters.SourceID) { sampleEvent = []byte(`{}`) } - sd = types.CreateStatusDetail(jobState, 0, errorCode, string(errorResp), sampleEvent, parameters.EventName, parameters.EventType) + sd = types.CreateStatusDetail(jobState, 0, 0, errorCode, string(errorResp), sampleEvent, parameters.EventName, parameters.EventType, "") statusDetailsMap[key] = sd } if status.JobState == jobsdb.Failed.State && status.AttemptNum == 1 { diff --git a/router/router.go b/router/router.go index ed8a0de731..fe8c8490a6 100644 --- a/router/router.go +++ b/router/router.go @@ -1309,7 +1309,7 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) { key := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s", parameters.SourceID, parameters.DestinationID, parameters.SourceJobRunID, resp.status.JobState, resp.status.ErrorCode, eventName, eventType) _, ok := connectionDetailsMap[key] if !ok { - cd := utilTypes.CreateConnectionDetail(parameters.SourceID, parameters.DestinationID, parameters.SourceTaskRunID, parameters.SourceJobID, parameters.SourceJobRunID, parameters.SourceDefinitionID, parameters.DestinationDefinitionID, parameters.SourceCategory) + cd := utilTypes.CreateConnectionDetail(parameters.SourceID, parameters.DestinationID, parameters.SourceTaskRunID, parameters.SourceJobID, parameters.SourceJobRunID, parameters.SourceDefinitionID, parameters.DestinationDefinitionID, parameters.SourceCategory, "", "", "", 0) connectionDetailsMap[key] = cd transformedAtMap[key] = parameters.TransformAt } @@ -1323,7 +1323,7 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) { if rt.transientSources.Apply(parameters.SourceID) { sampleEvent = routerutils.EmptyPayload } - sd = utilTypes.CreateStatusDetail(resp.status.JobState, 0, errorCode, string(resp.status.ErrorResponse), sampleEvent, eventName, eventType) + sd = utilTypes.CreateStatusDetail(resp.status.JobState, 0, 0, errorCode, string(resp.status.ErrorResponse), sampleEvent, eventName, eventType, "") statusDetailsMap[key] = sd } diff --git a/sql/migrations/reports/000004_alter_reports_add_tr_tp_columns.up.sql b/sql/migrations/reports/000004_alter_reports_add_tr_tp_columns.up.sql new file mode 100644 index 0000000000..62bc62ae06 --- /dev/null +++ b/sql/migrations/reports/000004_alter_reports_add_tr_tp_columns.up.sql @@ -0,0 +1,7 @@ +ALTER TABLE reports +ADD COLUMN IF NOT EXISTS transformation_id TEXT DEFAULT '', +ADD COLUMN IF NOT EXISTS transformation_version_id TEXT DEFAULT '', +ADD COLUMN IF NOT EXISTS tracking_plan_id TEXT DEFAULT '', +ADD COLUMN IF NOT EXISTS tracking_plan_version INT DEFAULT 0, +ADD COLUMN IF NOT EXISTS error_type TEXT DEFAULT '', +ADD COLUMN IF NOT EXISTS violation_count BIGINT DEFAULT 0; \ No newline at end of file diff --git a/utils/types/reporting_types.go b/utils/types/reporting_types.go index 86d96b8532..a949a4770b 100644 --- a/utils/types/reporting_types.go +++ b/utils/types/reporting_types.go @@ -35,6 +35,12 @@ var ( WAREHOUSE = "warehouse" ) +var ( + // Tracking plan validation states + SUCCEEDED_WITHOUT_VIOLATIONS = "succeeded_without_violations" + SUCCEEDED_WITH_VIOLATIONS = "succeeded_with_violations" +) + type Client struct { Config DbHandle *sql.DB @@ -48,6 +54,8 @@ type StatusDetail struct { SampleEvent json.RawMessage `json:"sampleEvent"` EventName string `json:"eventName"` EventType string `json:"eventType"` + ErrorType string `json:"errorType"` + ViolationCount int64 `json:"violationCount"` } type ReportByStatus struct { @@ -85,6 +93,10 @@ type ConnectionDetails struct { SourceDefinitionId string `json:"sourceDefinitionId"` DestinationDefinitionId string `string:"destinationDefinitionId"` SourceCategory string `json:"sourceCategory"` + TransformationID string `json:"transformationId"` + TransformationVersionID string `json:"transformationVersionId"` + TrackingPlanID string `json:"trackingPlanId"` + TrackingPlanVersion int `json:"trackingPlanVersion"` } type PUDetails struct { InPU string `json:"inReportedBy"` @@ -99,7 +111,7 @@ type PUReportedMetric struct { StatusDetail *StatusDetail } -func CreateConnectionDetail(sid, did, strid, sjid, sjrid, sdid, ddid, sc string) *ConnectionDetails { +func CreateConnectionDetail(sid, did, strid, sjid, sjrid, sdid, ddid, sc, trid, trvid, tpid string, tpv int) *ConnectionDetails { return &ConnectionDetails{ SourceID: sid, DestinationID: did, @@ -109,18 +121,24 @@ func CreateConnectionDetail(sid, did, strid, sjid, sjrid, sdid, ddid, sc string) SourceDefinitionId: sdid, DestinationDefinitionId: ddid, SourceCategory: sc, + TransformationID: trid, + TransformationVersionID: trvid, + TrackingPlanID: tpid, + TrackingPlanVersion: tpv, } } -func CreateStatusDetail(status string, count int64, code int, resp string, event json.RawMessage, eventName, eventType string) *StatusDetail { +func CreateStatusDetail(status string, count, violationCount int64, code int, resp string, event json.RawMessage, eventName, eventType, errorType string) *StatusDetail { return &StatusDetail{ Status: status, Count: count, + ViolationCount: violationCount, StatusCode: code, SampleResponse: resp, SampleEvent: event, EventName: eventName, EventType: eventType, + ErrorType: errorType, } } @@ -133,7 +151,7 @@ func CreatePUDetails(inPU, pu string, terminalPU, initialPU bool) *PUDetails { } } -func AssertSameKeys(m1 map[string]*ConnectionDetails, m2 map[string]*StatusDetail) { +func AssertSameKeys[V1, V2 any](m1 map[string]V1, m2 map[string]V2) { if len(m1) != len(m2) { panic("maps length don't match") // TODO improve msg }