From 3088a10f7ee89a485a9ea895a07cfc3c133f0716 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" <82795818+Sidddddarth@users.noreply.github.com> Date: Mon, 4 Mar 2024 16:06:42 +0530 Subject: [PATCH] chore: send source name in metadata to transformer (#4443) --- processor/processor.go | 6 ++ processor/processor_test.go | 94 +++++++++++++++++++--------- processor/trackingplan.go | 1 + processor/transformer/transformer.go | 1 + 4 files changed, 73 insertions(+), 29 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 723f061a25..20b37f043a 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -201,6 +201,7 @@ type DestStatT struct { type ParametersT struct { SourceID string `json:"source_id"` + SourceName string `json:"source_name"` DestinationID string `json:"destination_id"` ReceivedAt string `json:"received_at"` TransformAt string `json:"transform_at"` @@ -909,6 +910,7 @@ func enhanceWithTimeFields(event *transformer.TransformerEvent, singularEventMap func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, batchEvent *jobsdb.JobT, receivedAt time.Time, source *backendconfig.SourceT, eventParams types.EventParams) *transformer.Metadata { commonMetadata := transformer.Metadata{} commonMetadata.SourceID = source.ID + commonMetadata.SourceName = source.Name commonMetadata.WorkspaceID = source.WorkspaceID commonMetadata.Namespace = config.GetKubeNamespace() commonMetadata.InstanceID = misc.GetInstanceID() @@ -940,6 +942,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme metadata.SourceType = commonMetadata.SourceType metadata.SourceCategory = commonMetadata.SourceCategory metadata.SourceID = commonMetadata.SourceID + metadata.SourceName = commonMetadata.SourceName metadata.WorkspaceID = commonMetadata.WorkspaceID metadata.Namespace = commonMetadata.Namespace metadata.InstanceID = commonMetadata.InstanceID @@ -2385,11 +2388,13 @@ func (proc *Handle) transformSrcDest( defer proc.stats.pipeProcessing(partition).Since(time.Now()) sourceID, destID := getSourceAndDestIDsFromKey(srcAndDestKey) + sourceName := eventList[0].Metadata.SourceName destination := &eventList[0].Destination workspaceID := eventList[0].Metadata.WorkspaceID destType := destination.DestinationDefinition.Name commonMetaData := &transformer.Metadata{ SourceID: sourceID, + SourceName: sourceName, SourceType: eventList[0].Metadata.SourceType, SourceCategory: eventList[0].Metadata.SourceCategory, WorkspaceID: workspaceID, @@ -2717,6 +2722,7 @@ func (proc *Handle) transformSrcDest( params := ParametersT{ SourceID: sourceID, + SourceName: sourceName, DestinationID: destID, ReceivedAt: receivedAt, TransformAt: transformAt, diff --git a/processor/processor_test.go b/processor/processor_test.go index 85e4691b97..00e0457356 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -114,34 +114,55 @@ func (c *testContext) Finish() { } const ( - WriteKeyEnabled = "enabled-write-key" - WriteKeyEnabledNoUT = "enabled-write-key-no-ut" - WriteKeyEnabledNoUT2 = "enabled-write-key-no-ut2" - WriteKeyEnabledOnlyUT = "enabled-write-key-only-ut" - SourceIDEnabled = "enabled-source" - SourceIDTransient = "transient-source" - WriteKeyTransient = "transient-write-key" - SourceIDEnabledNoUT = "enabled-source-no-ut" - SourceIDEnabledOnlyUT = "enabled-source-only-ut" - SourceIDEnabledNoUT2 = "enabled-source-no-ut2" - SourceIDDisabled = "disabled-source" - DestinationIDEnabledA = "enabled-destination-a" // test destination router - DestinationIDEnabledB = "enabled-destination-b" // test destination batch router - DestinationIDEnabledC = "enabled-destination-c" - DestinationIDDisabled = "disabled-destination" - - SourceIDOneTrustConsent = "source-id-oneTrust-consent" - SourceIDGCM = "source-id-gcm" - WriteKeyOneTrustConsent = "write-key-oneTrust-consent" - WriteKeyGCM = "write-key-gcm" - - SourceIDKetchConsent = "source-id-ketch-consent" - WriteKeyKetchConsent = "write-key-ketch-consent" + WriteKeyEnabled = "enabled-write-key" + WriteKeyEnabledNoUT = "enabled-write-key-no-ut" + WriteKeyEnabledNoUT2 = "enabled-write-key-no-ut2" + WriteKeyEnabledOnlyUT = "enabled-write-key-only-ut" + SourceIDEnabled = "enabled-source" + SourceIDEnabledName = "SourceIDEnabled" + SourceIDTransient = "transient-source" + SourceIDTransientName = "SourceIDTransient" + WriteKeyTransient = "transient-write-key" + SourceIDEnabledNoUT = "enabled-source-no-ut" + SourceIDEnabledNoUTName = "SourceIDEnabledNoUT" + SourceIDEnabledOnlyUT = "enabled-source-only-ut" + SourceIDEnabledOnlyUTName = "SourceIDEnabledOnlyUT" + SourceIDEnabledNoUT2 = "enabled-source-no-ut2" + SourceIDEnabledNoUT2Name = "SourceIDEnabledNoUT2" + SourceIDDisabled = "disabled-source" + SourceIDDisabledName = "SourceIDDisabled" + DestinationIDEnabledA = "enabled-destination-a" // test destination router + DestinationIDEnabledB = "enabled-destination-b" // test destination batch router + DestinationIDEnabledC = "enabled-destination-c" + DestinationIDDisabled = "disabled-destination" + + SourceIDOneTrustConsent = "source-id-oneTrust-consent" + SourceIDOneTrustConsentName = "SourceIDOneTrustConsent" + SourceIDGCM = "source-id-gcm" + SourceIDGCMName = "SourceIDGCM" + WriteKeyOneTrustConsent = "write-key-oneTrust-consent" + WriteKeyGCM = "write-key-gcm" + + SourceIDKetchConsent = "source-id-ketch-consent" + SourceIDKetchConsentName = "SourceIDKetchConsent" + WriteKeyKetchConsent = "write-key-ketch-consent" ) var ( gatewayCustomVal = []string{"GW"} emptyJobsList []*jobsdb.JobT + + sourceIDToName = map[string]string{ + SourceIDEnabled: SourceIDEnabledName, + SourceIDEnabledNoUT: SourceIDEnabledNoUTName, + SourceIDEnabledOnlyUT: SourceIDEnabledOnlyUTName, + SourceIDEnabledNoUT2: SourceIDEnabledNoUT2Name, + SourceIDDisabled: SourceIDDisabledName, + SourceIDOneTrustConsent: SourceIDOneTrustConsentName, + SourceIDGCM: SourceIDGCMName, + SourceIDKetchConsent: SourceIDKetchConsentName, + SourceIDTransient: SourceIDTransientName, + } ) // SetDisableDedupFeature overrides SetDisableDedupFeature configuration and returns previous value @@ -163,11 +184,13 @@ var sampleBackendConfig = backendconfig.ConfigT{ Sources: []backendconfig.SourceT{ { ID: SourceIDDisabled, + Name: SourceIDDisabledName, WriteKey: WriteKeyEnabled, Enabled: false, }, { ID: SourceIDEnabled, + Name: SourceIDEnabledName, WriteKey: WriteKeyEnabled, Enabled: true, Destinations: []backendconfig.DestinationT{ @@ -229,6 +252,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDEnabledNoUT, + Name: SourceIDEnabledNoUTName, WriteKey: WriteKeyEnabledNoUT, Enabled: true, SourceDefinition: backendconfig.SourceDefinitionT{ @@ -264,6 +288,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDEnabledOnlyUT, + Name: SourceIDEnabledOnlyUTName, WriteKey: WriteKeyEnabledOnlyUT, Enabled: true, Destinations: []backendconfig.DestinationT{ @@ -288,6 +313,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDEnabledNoUT2, + Name: SourceIDEnabledNoUT2Name, WriteKey: WriteKeyEnabledNoUT2, Enabled: true, Destinations: []backendconfig.DestinationT{ @@ -325,6 +351,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDOneTrustConsent, + Name: SourceIDOneTrustConsentName, WriteKey: WriteKeyOneTrustConsent, WorkspaceID: sampleWorkspaceID, Enabled: true, @@ -424,6 +451,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDGCM, + Name: SourceIDGCMName, WriteKey: WriteKeyGCM, WorkspaceID: sampleWorkspaceID, Enabled: true, @@ -662,6 +690,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDKetchConsent, + Name: SourceIDKetchConsentName, WriteKey: WriteKeyKetchConsent, WorkspaceID: sampleWorkspaceID, Enabled: true, @@ -759,6 +788,7 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, { ID: SourceIDTransient, + Name: SourceIDTransientName, WriteKey: WriteKeyTransient, Enabled: true, Transient: true, @@ -1693,8 +1723,9 @@ var _ = Describe("Processor", Ordered, func() { Expect(job.ExpireAt).To(BeTemporally("~", time.Now(), 200*time.Millisecond)) Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination))) Expect(len(job.LastJobStatus.JobState)).To(Equal(0)) - require.JSONEq(GinkgoT(), `{ + require.JSONEq(GinkgoT(), fmt.Sprintf(`{ "source_id":"source-from-transformer", + "source_name": "%s", "destination_id":"destination-from-transformer", "received_at":"", "transform_at":"processor", @@ -1711,7 +1742,7 @@ var _ = Describe("Processor", Ordered, func() { "record_id":null, "workspaceId":"", "traceparent":"" - }`, string(job.Parameters)) + }`, sourceIDToName[SourceIDEnabledNoUT]), string(job.Parameters)) } // One Store call is expected for all events c.mockRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { @@ -1951,8 +1982,9 @@ var _ = Describe("Processor", Ordered, func() { // Expect(job.CustomVal).To(Equal("destination-definition-name-a")) Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination))) Expect(len(job.LastJobStatus.JobState)).To(Equal(0)) - require.JSONEq(GinkgoT(), `{ + require.JSONEq(GinkgoT(), fmt.Sprintf(`{ "source_id": "source-from-transformer", + "source_name": "%s", "destination_id": "destination-from-transformer", "received_at": "", "transform_at": "processor", @@ -1969,7 +2001,7 @@ var _ = Describe("Processor", Ordered, func() { "record_id": null, "workspaceId": "", "traceparent": "" - }`, string(job.Parameters)) + }`, sourceIDToName[SourceIDEnabledOnlyUT]), string(job.Parameters)) } c.mockBatchRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { @@ -4528,6 +4560,7 @@ func assertDestinationTransform( Expect(event.Metadata.JobID).To(Equal(messages[messageID].jobid)) Expect(event.Metadata.MessageID).To(Equal(messageID)) Expect(event.Metadata.SourceID).To(Equal(sourceId)) // ??? + Expect(event.Metadata.SourceName).To(Equal(sourceIDToName[sourceId])) rawEvent, err := json.Marshal(event) Expect(err).ToNot(HaveOccurred()) recordID := gjson.GetBytes(rawEvent, "message.recordId").Value() @@ -4547,6 +4580,7 @@ func assertDestinationTransform( Expect(event.Metadata.JobID).To(Equal(int64(0))) Expect(event.Metadata.MessageID).To(Equal("")) Expect(event.Metadata.SourceID).To(Equal(sourceId)) // ??? + Expect(event.Metadata.SourceName).To(Equal(sourceIDToName[sourceId])) } // Expect timestamp fields @@ -4591,7 +4625,8 @@ func assertDestinationTransform( "string-value": fmt.Sprintf("value-%s", destinationID), }, Metadata: transformer.Metadata{ - SourceID: "source-from-transformer", // transformer should replay source id + SourceID: "source-from-transformer", // transformer should replay source id + SourceName: "source-from-transformer-name", DestinationID: "destination-from-transformer", // transformer should replay destination id }, }, @@ -4601,7 +4636,8 @@ func assertDestinationTransform( "string-value": fmt.Sprintf("value-%s", destinationID), }, Metadata: transformer.Metadata{ - SourceID: "source-from-transformer", // transformer should replay source id + SourceID: "source-from-transformer", // transformer should replay source id + SourceName: "source-from-transformer-name", DestinationID: "destination-from-transformer", // transformer should replay destination id }, }, diff --git a/processor/trackingplan.go b/processor/trackingplan.go index 31022b5655..3c778c35b7 100644 --- a/processor/trackingplan.go +++ b/processor/trackingplan.go @@ -146,6 +146,7 @@ func makeCommonMetadataFromTransformerEvent(transformerEvent *transformer.Transf metadata := transformerEvent.Metadata commonMetaData := transformer.Metadata{ SourceID: metadata.SourceID, + SourceName: metadata.SourceName, SourceType: metadata.SourceType, SourceCategory: metadata.SourceCategory, WorkspaceID: metadata.WorkspaceID, diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index cd07fd44fd..5972e2f763 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -50,6 +50,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary type Metadata struct { SourceID string `json:"sourceId"` + SourceName string `json:"sourceName"` WorkspaceID string `json:"workspaceId"` Namespace string `json:"namespace"` InstanceID string `json:"instanceId"`