Skip to content

Commit

Permalink
feat(processor): filter unsupported messages (type & event fields) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Aug 19, 2022
1 parent bac75a6 commit e701950
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 94 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -21,3 +21,4 @@ coverage.html
*.orig
build/wait-for-go/wait-for-go
**/gomock_reflect_*/*
ginkgo.report
71 changes: 71 additions & 0 deletions processor/eventfilter/eventfilter.go
@@ -0,0 +1,71 @@
package eventfilter

import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/utils/misc"
)

// GetSupportedMessageTypes returns the supported message types for the given event, based on configuration.
// If no relevant configuration is found, returns false
func GetSupportedMessageTypes(destination *backendconfig.DestinationT) ([]string, bool) {
var supportedMessageTypes []string
if supportedTypes, ok := destination.DestinationDefinition.Config["supportedMessageTypes"]; ok {
if supportedTypeInterface, ok := supportedTypes.([]interface{}); ok {
supportedTypesArr := misc.ConvertInterfaceToStringArray(supportedTypeInterface)
for _, supportedType := range supportedTypesArr {
var skip bool
switch supportedType {
case "identify":
skip = identifyDisabled(destination)
default:
}
if !skip {
supportedMessageTypes = append(supportedMessageTypes, supportedType)
}
}
return supportedMessageTypes, true
}
}
return nil, false
}

func identifyDisabled(destination *backendconfig.DestinationT) bool {
if serverSideIdentify, flag := destination.Config["enableServerSideIdentify"]; flag {
if v, ok := serverSideIdentify.(bool); ok {
return !v
}
}
return false
}

// GetSupportedEvents returns the supported message events for the given destination, based on configuration.
// If no relevant configuration is found, returns false
func GetSupportedMessageEvents(destination *backendconfig.DestinationT) ([]string, bool) {
// "listOfConversions": [
// {
// "conversions": "Credit Card Added"
// },
// {
// "conversions": "Credit Card Removed"
// }
// ]
if supportedEventsI, ok := destination.Config["listOfConversions"]; ok {
if supportedEvents, ok := supportedEventsI.([]interface{}); ok {
var supportedMessageEvents []string
for _, supportedEvent := range supportedEvents {
if supportedEventMap, ok := supportedEvent.(map[string]interface{}); ok {
if conversions, ok := supportedEventMap["conversions"]; ok {
if supportedMessageEvent, ok := conversions.(string); ok {
supportedMessageEvents = append(supportedMessageEvents, supportedMessageEvent)
}
}
}
}
if len(supportedMessageEvents) != len(supportedEvents) {
return nil, false
}
return supportedMessageEvents, true
}
}
return nil, false
}
161 changes: 99 additions & 62 deletions processor/processor.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion processor/processorBenchmark_test.go
Expand Up @@ -12,7 +12,7 @@ import (
func Benchmark_makeCommonMetadataFromSingularEvent(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = makeCommonMetadataFromSingularEvent(
dummySingularEvent, &dummyBatchEvent, time.Now(), backendconfig.SourceT{
dummySingularEvent, &dummyBatchEvent, time.Now(), &backendconfig.SourceT{
WorkspaceID: "test",
SourceDefinition: backendconfig.SourceDefinitionT{
Name: "test_def",
Expand Down

0 comments on commit e701950

Please sign in to comment.