Skip to content

Commit

Permalink
feat: support for config to custom destinations (#2625)
Browse files Browse the repository at this point in the history
* feat(support for config to custom destinations): adding support for passing config to transformer for custom destinations

* feat(support for config to custom destinations): adding support for filtering via destination definition

* feat(support for config to custom destinations): updated DestinationDefinitionT and checks before accessing values

* feat(support for config to custom destinations): updated for review comments handled type checks

* feat(support for config to custom destinations): adding support for passing config to transformer for custom destinations

* feat(support for config to custom destinations): adding support for filtering via destination definition

* feat(support for config to custom destinations): updated DestinationDefinitionT and checks before accessing values

* feat(support for config to custom destinations): updated for review comments handled type checks

* feat(support for config to custom destinations): added unit test cases for added changes

* feat(support for config to custom destinations): updated type checks to consider array of interface and casting individual types

* chore: lint clean up

* feat(support for config to custom destinations): minor cleanups and adding test case with unmarshled configs

Co-authored-by: Chandra shekar Varkala <chandra@rudderlabs.com>
  • Loading branch information
utsabc and Chandra shekar Varkala committed Nov 2, 2022
1 parent 5c26d1b commit cb230b0
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 18 deletions.
3 changes: 1 addition & 2 deletions processor/integrations/integrations.go
Expand Up @@ -16,8 +16,7 @@ import (
)

var (
jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary

jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
destTransformURL string
postParametersTFields []string
)
Expand Down
27 changes: 14 additions & 13 deletions processor/processor.go
Expand Up @@ -473,7 +473,6 @@ var (
destinationIDtoTypeMap map[string]string
batchDestinations []string
configSubscriberLock sync.RWMutex
customDestinations []string
pkgLogger logger.Logger
enableEventSchemasFeature bool
enableEventSchemasAPIOnly bool
Expand Down Expand Up @@ -509,7 +508,7 @@ func loadConfig() {
config.RegisterBoolConfigVariable(false, &enableEventSchemasAPIOnly, true, "EventSchemas.enableEventSchemasAPIOnly")
config.RegisterIntConfigVariable(10000, &maxEventsToProcess, true, 1, "Processor.maxLoopProcessEvents")

batchDestinations, customDestinations = misc.LoadDestinations()
batchDestinations = misc.BatchDestinations()
config.RegisterIntConfigVariable(5, &transformTimesPQLength, false, 1, "Processor.transformTimesPQLength")
// Capture event name as a tag in event level stats
config.RegisterBoolConfigVariable(false, &captureEventNameStats, true, "Processor.Stats.captureEventName")
Expand Down Expand Up @@ -1390,17 +1389,7 @@ func (proc *HandleT) processJobsForDest(subJobs subJob, parsedEventList [][]type
// At the TP flow we are not having destination information, so adding it here.
shallowEventCopy.Metadata.DestinationID = destination.ID
shallowEventCopy.Metadata.DestinationType = destination.DestinationDefinition.Name

//TODO: Test for multiple workspaces ex: hosted data plane
/* Stream destinations does not need config in transformer. As the Kafka destination config
holds the ca-certificate and it depends on user input, it may happen that they provide entire
certificate chain. So, that will make the payload huge while sending a batch of events to transformer,
it may result into payload larger than accepted by transformer. So, discarding destination config from being
sent to transformer for such destination. */
if misc.Contains(customDestinations, *destType) {
shallowEventCopy.Destination.Config = nil
}

filterConfig(&shallowEventCopy, destination)
metadata := shallowEventCopy.Metadata
srcAndDestKey := getKeyFromSourceAndDest(metadata.SourceID, metadata.DestinationID)
// We have at-least one event so marking it good
Expand Down Expand Up @@ -2589,3 +2578,15 @@ func (proc *HandleT) updateRudderSourcesStats(ctx context.Context, tx jobsdb.Sto
err := rsourcesStats.Publish(ctx, tx.SqlTx())
return err
}

func filterConfig(eventCopy *transformer.TransformerEventT, destination *backendconfig.DestinationT) {
if configsToFilterI, ok := destination.DestinationDefinition.Config["configFilters"]; ok {
if configsToFilter, ok := configsToFilterI.([]interface{}); ok {
for _, configKey := range configsToFilter {
if configKeyStr, ok := configKey.(string); ok {
eventCopy.Destination.Config[configKeyStr] = ""
}
}
}
}
}
153 changes: 153 additions & 0 deletions processor/processor_test.go
Expand Up @@ -2407,3 +2407,156 @@ var _ = Describe("TestSubJobMerger", func() {
})
})
})

var _ = Describe("TestConfigFilter", func() {
Context("testing config filter", func() {
It("success-full test", func() {
intgConfig := backendconfig.DestinationT{
ID: "1",
Name: "test",
Config: map[string]interface{}{
"config_key": "config_value",
"long_config1": "long_config1_value..................................",
"long_config2": map[string]interface{}{"hello": "world"},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"configFilters": []interface{}{"long_config1", "long_config2"},
},
},
Enabled: true,
IsProcessorEnabled: true,
}
expectedEvent := transformer.TransformerEventT{
Message: types.SingularEventT{
"MessageID": "messageId-1",
},
Destination: backendconfig.DestinationT{
ID: "1",
Name: "test",
Config: map[string]interface{}{
"config_key": "config_value",
"long_config1": "",
"long_config2": "",
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"configFilters": []interface{}{"long_config1", "long_config2"},
},
},
Enabled: true,
IsProcessorEnabled: true,
},
}
event := transformer.TransformerEventT{
Message: types.SingularEventT{
"MessageID": "messageId-1",
},
Destination: intgConfig,
}
filterConfig(&event, &intgConfig)
Expect(event).To(Equal(expectedEvent))
})

It("success-full test with marshalling", func() {
var intgConfig backendconfig.DestinationT
var destDef backendconfig.DestinationDefinitionT
intgConfigStr := `{
"id": "1",
"name": "test",
"config": {
"config_key":"config_value",
"long_config1": "long_config1_value..................................",
"long_config2": {"hello": "world"}
},
"enabled": true,
"isProcessorEnabled": true
}`
destDefStr := `{
"config": {
"configFilters": ["long_config1", "long_config2"]
}
}`
json.Unmarshal([]byte(intgConfigStr), &intgConfig)
json.Unmarshal([]byte(destDefStr), &destDef)
intgConfig.DestinationDefinition = destDef
expectedEvent := transformer.TransformerEventT{
Message: types.SingularEventT{
"MessageID": "messageId-1",
},
Destination: backendconfig.DestinationT{
ID: "1",
Name: "test",
Config: map[string]interface{}{
"config_key": "config_value",
"long_config1": "",
"long_config2": "",
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"configFilters": []interface{}{"long_config1", "long_config2"},
},
},
Enabled: true,
IsProcessorEnabled: true,
},
}
event := transformer.TransformerEventT{
Message: types.SingularEventT{
"MessageID": "messageId-1",
},
Destination: intgConfig,
}
filterConfig(&event, &intgConfig)
Expect(event).To(Equal(expectedEvent))
})

It("failure test", func() {
intgConfig := backendconfig.DestinationT{
ID: "1",
Name: "test",
Config: map[string]interface{}{
"config_key": "config_value",
"long_config1": "long_config1_value..................................",
"long_config2": map[string]interface{}{"hello": "world"},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"configFilters": nil,
},
},
Enabled: true,
IsProcessorEnabled: true,
}
expectedEvent := transformer.TransformerEventT{
Message: types.SingularEventT{
"MessageID": "messageId-1",
},
Destination: backendconfig.DestinationT{
ID: "1",
Name: "test",
Config: map[string]interface{}{
"config_key": "config_value",
"long_config1": "long_config1_value..................................",
"long_config2": map[string]interface{}{"hello": "world"},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{
"configFilters": nil,
},
},
Enabled: true,
IsProcessorEnabled: true,
},
}
event := transformer.TransformerEventT{
Message: types.SingularEventT{
"MessageID": "messageId-1",
},
Destination: intgConfig,
}
filterConfig(&event, &intgConfig)
Expect(event).To(Equal(expectedEvent))
})
})
})
5 changes: 2 additions & 3 deletions utils/misc/misc.go
Expand Up @@ -120,10 +120,9 @@ func Init() {
reservedFolderPaths = GetReservedFolderPaths()
}

func LoadDestinations() ([]string, []string) {
func BatchDestinations() []string {
batchDestinations := []string{"S3", "GCS", "MINIO", "RS", "BQ", "AZURE_BLOB", "SNOWFLAKE", "POSTGRES", "CLICKHOUSE", "DIGITAL_OCEAN_SPACES", "MSSQL", "AZURE_SYNAPSE", "S3_DATALAKE", "MARKETO_BULK_UPLOAD", "GCS_DATALAKE", "AZURE_DATALAKE", "DELTALAKE"}
customDestinations := []string{"KAFKA", "KINESIS", "AZURE_EVENT_HUB", "CONFLUENT_CLOUD"}
return batchDestinations, customDestinations
return batchDestinations
}

func getErrorStore() (ErrorStoreT, error) {
Expand Down

0 comments on commit cb230b0

Please sign in to comment.