Skip to content

Commit

Permalink
Store regex mapping as a slice and always try to handle the topic nam…
Browse files Browse the repository at this point in the history
…e as a regex first
  • Loading branch information
upils committed Feb 17, 2024
1 parent acc2278 commit 94ef19b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 109 deletions.
3 changes: 0 additions & 3 deletions backend/pkg/config/proto_topic_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ type ProtoTopicMapping struct {
// TopicName is the name of the topic to apply these proto files. This supports regex.
TopicName string `yaml:"topicName"`

// IsRegex indicates if the topicName must be handled as a regex and match multiple topic names
IsRegex bool `yaml:"isRegex"`

// KeyProtoType is the proto's fully qualified name that shall be used for a Kafka record's key
KeyProtoType string `yaml:"keyProtoType"`

Expand Down
35 changes: 17 additions & 18 deletions backend/pkg/proto/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Service struct {
logger *zap.Logger

strictMappingsByTopic map[string]config.ProtoTopicMapping
regexMappingsByTopic map[string]regexProtoTopicMapping
mappingsRegex []regexProtoTopicMapping
gitSvc *git.Service
fsSvc *filesystem.Service
schemaSvc *schema.Service
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service)
}
}

strictMappingsByTopic, regexMappingsByTopic, err := setMappingsByTopic(cfg.Mappings)
strictMappingsByTopic, mappingsRegex, err := setMappingsByTopic(cfg.Mappings, logger)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +129,7 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service)
logger: logger,

strictMappingsByTopic: strictMappingsByTopic,
regexMappingsByTopic: regexMappingsByTopic,
mappingsRegex: mappingsRegex,
gitSvc: gitSvc,
fsSvc: fsSvc,
schemaSvc: schemaSvc,
Expand All @@ -139,27 +139,26 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service)
}, nil
}

func setMappingsByTopic(mappings []config.ProtoTopicMapping) (strictMappingsByTopic map[string]config.ProtoTopicMapping, regexMappingsByTopic map[string]regexProtoTopicMapping, err error) {
func setMappingsByTopic(mappings []config.ProtoTopicMapping, logger *zap.Logger) (strictMappingsByTopic map[string]config.ProtoTopicMapping, mappingsRegex []regexProtoTopicMapping, err error) {
strictMappingsByTopic = make(map[string]config.ProtoTopicMapping)
regexMappingsByTopic = make(map[string]regexProtoTopicMapping)
mappingsRegex = make([]regexProtoTopicMapping, 0)

for _, mapping := range mappings {
if mapping.IsRegex {
r, err := regexp.Compile(mapping.TopicName)
if err != nil {
return nil, nil, fmt.Errorf("invalid regexp as a topic name: %w", err)
}

regexMappingsByTopic[mapping.TopicName] = regexProtoTopicMapping{
ProtoTopicMapping: mapping,
r: r,
}
r, err := regexp.Compile(mapping.TopicName)
if err != nil {
strictMappingsByTopic[mapping.TopicName] = mapping
logger.Warn("topic name handled as a strict match", zap.String("topic_name", mapping.TopicName))
continue
}
strictMappingsByTopic[mapping.TopicName] = mapping

mappingsRegex = append(mappingsRegex, regexProtoTopicMapping{
ProtoTopicMapping: mapping,
r: r,
})
continue
}

return strictMappingsByTopic, regexMappingsByTopic, err
return strictMappingsByTopic, mappingsRegex, err
}

// Start polling the prototypes from the configured provider (e.g. filesystem or Git) and sync these
Expand Down Expand Up @@ -346,7 +345,7 @@ func (s *Service) getMatchingMapping(topicName string) (mapping config.ProtoTopi
}

var match bool
for _, rMapping := range s.regexMappingsByTopic {
for _, rMapping := range s.mappingsRegex {
match = rMapping.r.MatchString(topicName)
if match {
mapping = rMapping.ProtoTopicMapping
Expand Down
92 changes: 4 additions & 88 deletions backend/pkg/proto/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package proto
import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"regexp"
"testing"
Expand Down Expand Up @@ -44,36 +43,21 @@ func Test_decodeConfluentBinaryWrapper(t *testing.T) {
var (
strictTopicMapping01 = config.ProtoTopicMapping{
TopicName: "strictTopicMapping01",
IsRegex: false,
}

strictTopicMapping02 = config.ProtoTopicMapping{
TopicName: "stritctTopicMapping02",
IsRegex: false,
}

regexTopicMapping01 = config.ProtoTopicMapping{
TopicName: "TopicName",
IsRegex: true,
}
)

func genNTopicMappings(n int, baseName string, isRegex bool) []config.ProtoTopicMapping {
m := make([]config.ProtoTopicMapping, 0)
for i := 0; i < n; i++ {
m = append(m, config.ProtoTopicMapping{
TopicName: fmt.Sprintf("%v%v", baseName, i),
IsRegex: isRegex,
})
}

return m
}

func TestService_getMatchingMapping(t *testing.T) {
type fields struct {
strictMappingsByTopic map[string]config.ProtoTopicMapping
regexMappingsByTopic map[string]regexProtoTopicMapping
regexMappingsByTopic []regexProtoTopicMapping
}
type args struct {
topicName string
Expand Down Expand Up @@ -106,8 +90,8 @@ func TestService_getMatchingMapping(t *testing.T) {
strictTopicMapping02.TopicName: strictTopicMapping02,
regexTopicMapping01.TopicName: regexTopicMapping01,
},
regexMappingsByTopic: map[string]regexProtoTopicMapping{
regexTopicMapping01.TopicName: {
regexMappingsByTopic: []regexProtoTopicMapping{
{
ProtoTopicMapping: regexTopicMapping01,
r: regexp.MustCompile(regexTopicMapping01.TopicName),
},
Expand All @@ -123,7 +107,7 @@ func TestService_getMatchingMapping(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := &Service{
strictMappingsByTopic: tt.fields.strictMappingsByTopic,
regexMappingsByTopic: tt.fields.regexMappingsByTopic,
mappingsRegex: tt.fields.regexMappingsByTopic,
}
gotMapping, err := s.getMatchingMapping(tt.args.topicName)
if (err != nil) != tt.wantErr {
Expand All @@ -136,71 +120,3 @@ func TestService_getMatchingMapping(t *testing.T) {
})
}
}

func BenchmarkService_getMatchingMapping(b *testing.B) {
benchs := []struct {
name string
baseName string
topicCount int
iter int
ratio float32 // must be between 0 and 1
}{
{
name: "Only strict mappings",
baseName: "strictMapping",
topicCount: 100,
iter: 100,
ratio: 0.0,
},
{
name: "10% regex mappings",
baseName: "regexMapping",
topicCount: 100,
iter: 100,
ratio: 0.1,
},
{
name: "50% regex mappings",
baseName: "regexMapping",
topicCount: 100,
iter: 100,
ratio: 0.5,
},
{
name: "90% regex mappings",
baseName: "regexMapping",
topicCount: 100,
iter: 100,
ratio: 1.0,
},
}
for _, bench := range benchs {
b.Run(bench.name, func(b *testing.B) {
strictTopicMappings := genNTopicMappings(int(float32(bench.topicCount)*(1.0-bench.ratio)), "strictMapping", false)
regexTopicMappings := genNTopicMappings(int(float32(bench.topicCount)*bench.ratio), "regexMapping", true)

strictMappingsByTopic, regexMappingsByTopic, err := setMappingsByTopic(append(strictTopicMappings, regexTopicMappings...))
if err != nil {
b.Error(err)
}

topicNames := make([]string, 0)

for i := 0; i < bench.topicCount; i++ {
topicNames = append(topicNames, fmt.Sprintf("%v%v", bench.baseName, i))
}

for i := 0; i < b.N; i++ {
s := &Service{
strictMappingsByTopic: strictMappingsByTopic,
regexMappingsByTopic: regexMappingsByTopic,
}
for n := 0; n < bench.iter; n++ {
for _, topicName := range topicNames {
_, _ = s.getMatchingMapping(topicName)
}
}
}
})
}
}

0 comments on commit 94ef19b

Please sign in to comment.