Skip to content

Commit

Permalink
chore: send source name in metadata to transformer (#4443)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Mar 4, 2024
1 parent fe95e15 commit 3088a10
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
6 changes: 6 additions & 0 deletions processor/processor.go
Expand Up @@ -201,6 +201,7 @@ type DestStatT struct {

type ParametersT struct {
SourceID string `json:"source_id"`
SourceName string `json:"source_name"`
DestinationID string `json:"destination_id"`
ReceivedAt string `json:"received_at"`
TransformAt string `json:"transform_at"`
Expand Down Expand Up @@ -909,6 +910,7 @@ func enhanceWithTimeFields(event *transformer.TransformerEvent, singularEventMap
func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, batchEvent *jobsdb.JobT, receivedAt time.Time, source *backendconfig.SourceT, eventParams types.EventParams) *transformer.Metadata {
commonMetadata := transformer.Metadata{}
commonMetadata.SourceID = source.ID
commonMetadata.SourceName = source.Name
commonMetadata.WorkspaceID = source.WorkspaceID
commonMetadata.Namespace = config.GetKubeNamespace()
commonMetadata.InstanceID = misc.GetInstanceID()
Expand Down Expand Up @@ -940,6 +942,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme
metadata.SourceType = commonMetadata.SourceType
metadata.SourceCategory = commonMetadata.SourceCategory
metadata.SourceID = commonMetadata.SourceID
metadata.SourceName = commonMetadata.SourceName
metadata.WorkspaceID = commonMetadata.WorkspaceID
metadata.Namespace = commonMetadata.Namespace
metadata.InstanceID = commonMetadata.InstanceID
Expand Down Expand Up @@ -2385,11 +2388,13 @@ func (proc *Handle) transformSrcDest(
defer proc.stats.pipeProcessing(partition).Since(time.Now())

sourceID, destID := getSourceAndDestIDsFromKey(srcAndDestKey)
sourceName := eventList[0].Metadata.SourceName
destination := &eventList[0].Destination
workspaceID := eventList[0].Metadata.WorkspaceID
destType := destination.DestinationDefinition.Name
commonMetaData := &transformer.Metadata{
SourceID: sourceID,
SourceName: sourceName,
SourceType: eventList[0].Metadata.SourceType,
SourceCategory: eventList[0].Metadata.SourceCategory,
WorkspaceID: workspaceID,
Expand Down Expand Up @@ -2717,6 +2722,7 @@ func (proc *Handle) transformSrcDest(

params := ParametersT{
SourceID: sourceID,
SourceName: sourceName,
DestinationID: destID,
ReceivedAt: receivedAt,
TransformAt: transformAt,
Expand Down
94 changes: 65 additions & 29 deletions processor/processor_test.go
Expand Up @@ -114,34 +114,55 @@ func (c *testContext) Finish() {
}

const (
WriteKeyEnabled = "enabled-write-key"
WriteKeyEnabledNoUT = "enabled-write-key-no-ut"
WriteKeyEnabledNoUT2 = "enabled-write-key-no-ut2"
WriteKeyEnabledOnlyUT = "enabled-write-key-only-ut"
SourceIDEnabled = "enabled-source"
SourceIDTransient = "transient-source"
WriteKeyTransient = "transient-write-key"
SourceIDEnabledNoUT = "enabled-source-no-ut"
SourceIDEnabledOnlyUT = "enabled-source-only-ut"
SourceIDEnabledNoUT2 = "enabled-source-no-ut2"
SourceIDDisabled = "disabled-source"
DestinationIDEnabledA = "enabled-destination-a" // test destination router
DestinationIDEnabledB = "enabled-destination-b" // test destination batch router
DestinationIDEnabledC = "enabled-destination-c"
DestinationIDDisabled = "disabled-destination"

SourceIDOneTrustConsent = "source-id-oneTrust-consent"
SourceIDGCM = "source-id-gcm"
WriteKeyOneTrustConsent = "write-key-oneTrust-consent"
WriteKeyGCM = "write-key-gcm"

SourceIDKetchConsent = "source-id-ketch-consent"
WriteKeyKetchConsent = "write-key-ketch-consent"
WriteKeyEnabled = "enabled-write-key"
WriteKeyEnabledNoUT = "enabled-write-key-no-ut"
WriteKeyEnabledNoUT2 = "enabled-write-key-no-ut2"
WriteKeyEnabledOnlyUT = "enabled-write-key-only-ut"
SourceIDEnabled = "enabled-source"
SourceIDEnabledName = "SourceIDEnabled"
SourceIDTransient = "transient-source"
SourceIDTransientName = "SourceIDTransient"
WriteKeyTransient = "transient-write-key"
SourceIDEnabledNoUT = "enabled-source-no-ut"
SourceIDEnabledNoUTName = "SourceIDEnabledNoUT"
SourceIDEnabledOnlyUT = "enabled-source-only-ut"
SourceIDEnabledOnlyUTName = "SourceIDEnabledOnlyUT"
SourceIDEnabledNoUT2 = "enabled-source-no-ut2"
SourceIDEnabledNoUT2Name = "SourceIDEnabledNoUT2"
SourceIDDisabled = "disabled-source"
SourceIDDisabledName = "SourceIDDisabled"
DestinationIDEnabledA = "enabled-destination-a" // test destination router
DestinationIDEnabledB = "enabled-destination-b" // test destination batch router
DestinationIDEnabledC = "enabled-destination-c"
DestinationIDDisabled = "disabled-destination"

SourceIDOneTrustConsent = "source-id-oneTrust-consent"
SourceIDOneTrustConsentName = "SourceIDOneTrustConsent"
SourceIDGCM = "source-id-gcm"
SourceIDGCMName = "SourceIDGCM"
WriteKeyOneTrustConsent = "write-key-oneTrust-consent"
WriteKeyGCM = "write-key-gcm"

SourceIDKetchConsent = "source-id-ketch-consent"
SourceIDKetchConsentName = "SourceIDKetchConsent"
WriteKeyKetchConsent = "write-key-ketch-consent"
)

var (
gatewayCustomVal = []string{"GW"}
emptyJobsList []*jobsdb.JobT

sourceIDToName = map[string]string{
SourceIDEnabled: SourceIDEnabledName,
SourceIDEnabledNoUT: SourceIDEnabledNoUTName,
SourceIDEnabledOnlyUT: SourceIDEnabledOnlyUTName,
SourceIDEnabledNoUT2: SourceIDEnabledNoUT2Name,
SourceIDDisabled: SourceIDDisabledName,
SourceIDOneTrustConsent: SourceIDOneTrustConsentName,
SourceIDGCM: SourceIDGCMName,
SourceIDKetchConsent: SourceIDKetchConsentName,
SourceIDTransient: SourceIDTransientName,
}
)

// SetDisableDedupFeature overrides SetDisableDedupFeature configuration and returns previous value
Expand All @@ -163,11 +184,13 @@ var sampleBackendConfig = backendconfig.ConfigT{
Sources: []backendconfig.SourceT{
{
ID: SourceIDDisabled,
Name: SourceIDDisabledName,
WriteKey: WriteKeyEnabled,
Enabled: false,
},
{
ID: SourceIDEnabled,
Name: SourceIDEnabledName,
WriteKey: WriteKeyEnabled,
Enabled: true,
Destinations: []backendconfig.DestinationT{
Expand Down Expand Up @@ -229,6 +252,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDEnabledNoUT,
Name: SourceIDEnabledNoUTName,
WriteKey: WriteKeyEnabledNoUT,
Enabled: true,
SourceDefinition: backendconfig.SourceDefinitionT{
Expand Down Expand Up @@ -264,6 +288,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDEnabledOnlyUT,
Name: SourceIDEnabledOnlyUTName,
WriteKey: WriteKeyEnabledOnlyUT,
Enabled: true,
Destinations: []backendconfig.DestinationT{
Expand All @@ -288,6 +313,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDEnabledNoUT2,
Name: SourceIDEnabledNoUT2Name,
WriteKey: WriteKeyEnabledNoUT2,
Enabled: true,
Destinations: []backendconfig.DestinationT{
Expand Down Expand Up @@ -325,6 +351,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDOneTrustConsent,
Name: SourceIDOneTrustConsentName,
WriteKey: WriteKeyOneTrustConsent,
WorkspaceID: sampleWorkspaceID,
Enabled: true,
Expand Down Expand Up @@ -424,6 +451,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDGCM,
Name: SourceIDGCMName,
WriteKey: WriteKeyGCM,
WorkspaceID: sampleWorkspaceID,
Enabled: true,
Expand Down Expand Up @@ -662,6 +690,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDKetchConsent,
Name: SourceIDKetchConsentName,
WriteKey: WriteKeyKetchConsent,
WorkspaceID: sampleWorkspaceID,
Enabled: true,
Expand Down Expand Up @@ -759,6 +788,7 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
{
ID: SourceIDTransient,
Name: SourceIDTransientName,
WriteKey: WriteKeyTransient,
Enabled: true,
Transient: true,
Expand Down Expand Up @@ -1693,8 +1723,9 @@ var _ = Describe("Processor", Ordered, func() {
Expect(job.ExpireAt).To(BeTemporally("~", time.Now(), 200*time.Millisecond))
Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination)))
Expect(len(job.LastJobStatus.JobState)).To(Equal(0))
require.JSONEq(GinkgoT(), `{
require.JSONEq(GinkgoT(), fmt.Sprintf(`{
"source_id":"source-from-transformer",
"source_name": "%s",
"destination_id":"destination-from-transformer",
"received_at":"",
"transform_at":"processor",
Expand All @@ -1711,7 +1742,7 @@ var _ = Describe("Processor", Ordered, func() {
"record_id":null,
"workspaceId":"",
"traceparent":""
}`, string(job.Parameters))
}`, sourceIDToName[SourceIDEnabledNoUT]), string(job.Parameters))
}
// One Store call is expected for all events
c.mockRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) {
Expand Down Expand Up @@ -1951,8 +1982,9 @@ var _ = Describe("Processor", Ordered, func() {
// Expect(job.CustomVal).To(Equal("destination-definition-name-a"))
Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination)))
Expect(len(job.LastJobStatus.JobState)).To(Equal(0))
require.JSONEq(GinkgoT(), `{
require.JSONEq(GinkgoT(), fmt.Sprintf(`{
"source_id": "source-from-transformer",
"source_name": "%s",
"destination_id": "destination-from-transformer",
"received_at": "",
"transform_at": "processor",
Expand All @@ -1969,7 +2001,7 @@ var _ = Describe("Processor", Ordered, func() {
"record_id": null,
"workspaceId": "",
"traceparent": ""
}`, string(job.Parameters))
}`, sourceIDToName[SourceIDEnabledOnlyUT]), string(job.Parameters))
}

c.mockBatchRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) {
Expand Down Expand Up @@ -4528,6 +4560,7 @@ func assertDestinationTransform(
Expect(event.Metadata.JobID).To(Equal(messages[messageID].jobid))
Expect(event.Metadata.MessageID).To(Equal(messageID))
Expect(event.Metadata.SourceID).To(Equal(sourceId)) // ???
Expect(event.Metadata.SourceName).To(Equal(sourceIDToName[sourceId]))
rawEvent, err := json.Marshal(event)
Expect(err).ToNot(HaveOccurred())
recordID := gjson.GetBytes(rawEvent, "message.recordId").Value()
Expand All @@ -4547,6 +4580,7 @@ func assertDestinationTransform(
Expect(event.Metadata.JobID).To(Equal(int64(0)))
Expect(event.Metadata.MessageID).To(Equal(""))
Expect(event.Metadata.SourceID).To(Equal(sourceId)) // ???
Expect(event.Metadata.SourceName).To(Equal(sourceIDToName[sourceId]))
}

// Expect timestamp fields
Expand Down Expand Up @@ -4591,7 +4625,8 @@ func assertDestinationTransform(
"string-value": fmt.Sprintf("value-%s", destinationID),
},
Metadata: transformer.Metadata{
SourceID: "source-from-transformer", // transformer should replay source id
SourceID: "source-from-transformer", // transformer should replay source id
SourceName: "source-from-transformer-name",
DestinationID: "destination-from-transformer", // transformer should replay destination id
},
},
Expand All @@ -4601,7 +4636,8 @@ func assertDestinationTransform(
"string-value": fmt.Sprintf("value-%s", destinationID),
},
Metadata: transformer.Metadata{
SourceID: "source-from-transformer", // transformer should replay source id
SourceID: "source-from-transformer", // transformer should replay source id
SourceName: "source-from-transformer-name",
DestinationID: "destination-from-transformer", // transformer should replay destination id
},
},
Expand Down
1 change: 1 addition & 0 deletions processor/trackingplan.go
Expand Up @@ -146,6 +146,7 @@ func makeCommonMetadataFromTransformerEvent(transformerEvent *transformer.Transf
metadata := transformerEvent.Metadata
commonMetaData := transformer.Metadata{
SourceID: metadata.SourceID,
SourceName: metadata.SourceName,
SourceType: metadata.SourceType,
SourceCategory: metadata.SourceCategory,
WorkspaceID: metadata.WorkspaceID,
Expand Down
1 change: 1 addition & 0 deletions processor/transformer/transformer.go
Expand Up @@ -50,6 +50,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary

type Metadata struct {
SourceID string `json:"sourceId"`
SourceName string `json:"sourceName"`
WorkspaceID string `json:"workspaceId"`
Namespace string `json:"namespace"`
InstanceID string `json:"instanceId"`
Expand Down

0 comments on commit 3088a10

Please sign in to comment.