Skip to content

Commit

Permalink
Enable proto topic mapping matching with regex
Browse files Browse the repository at this point in the history
Signed-off-by: Paul MARS <paul.mars@intrinsec.com>
  • Loading branch information
Paul MARS committed Jan 11, 2023
1 parent 6cd4fe8 commit 249f579
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 13 deletions.
4 changes: 4 additions & 0 deletions backend/pkg/config/proto_topic_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ package config
// if the topics have been serialized without the schema registry being involved in the
// serialization.
type ProtoTopicMapping struct {
// TopicName is the name of the topic to apply these proto files. This supports regex.
TopicName string `yaml:"topicName"`

// IsRegex indicates if the topicName must be handled as a regex and match multiple topic names
IsRegex bool `yaml:"isRegex"`

// KeyProtoType is the proto's fully qualified name that shall be used for a Kafka record's key
KeyProtoType string `yaml:"keyProtoType"`

Expand Down
70 changes: 57 additions & 13 deletions backend/pkg/proto/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"regexp"
"strings"
"sync"

Expand Down Expand Up @@ -42,17 +43,23 @@ const (
RecordValue
)

type RegexProtoTopicMapping struct {
config.ProtoTopicMapping
r *regexp.Regexp
}

// Service is in charge of deserializing protobuf encoded payloads. It supports payloads that were
// encoded with involvement of the schema registry as well as plain protobuf-encoded messages.
// This service is also in charge of reading the proto source files from the configured provider.
type Service struct {
cfg config.Proto
logger *zap.Logger

mappingsByTopic map[string]config.ProtoTopicMapping
gitSvc *git.Service
fsSvc *filesystem.Service
schemaSvc *schema.Service
strictMappingsByTopic map[string]config.ProtoTopicMapping
regexMappingsByTopic map[string]RegexProtoTopicMapping
gitSvc *git.Service
fsSvc *filesystem.Service
schemaSvc *schema.Service

// fileDescriptorsBySchemaID are used to find the right schema type for messages at deserialization time. The type
// index is encoded as part of the serialized message.
Expand Down Expand Up @@ -105,19 +112,34 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service)
}
}

mappingsByTopic := make(map[string]config.ProtoTopicMapping)
strictMappingsByTopic := make(map[string]config.ProtoTopicMapping)
regexMappingsByTopic := make(map[string]RegexProtoTopicMapping)

for _, mapping := range cfg.Mappings {
mappingsByTopic[mapping.TopicName] = mapping
if mapping.IsRegex {
r, err := regexp.Compile(mapping.TopicName)
if err != nil {
return nil, fmt.Errorf("invalid regexp as a topic name: %w", err)
}

regexMappingsByTopic[mapping.TopicName] = RegexProtoTopicMapping{
ProtoTopicMapping: mapping,
r: r,
}
continue
}
strictMappingsByTopic[mapping.TopicName] = mapping
}

return &Service{
cfg: cfg,
logger: logger,

mappingsByTopic: mappingsByTopic,
gitSvc: gitSvc,
fsSvc: fsSvc,
schemaSvc: schemaSvc,
strictMappingsByTopic: strictMappingsByTopic,
regexMappingsByTopic: regexMappingsByTopic,
gitSvc: gitSvc,
fsSvc: fsSvc,
schemaSvc: schemaSvc,

// registry has to be created afterwards
registry: nil,
Expand Down Expand Up @@ -251,12 +273,34 @@ func (s *Service) getMessageDescriptorFromConfluentMessage(wrapper *confluentEnv
return msgType, wrapper.ProtoPayload, nil
}

func (s *Service) getMatchingMapping(topicName string) (mapping config.ProtoTopicMapping, err error) {
mapping, strictExists := s.strictMappingsByTopic[topicName]
if strictExists {
return mapping, nil
}

var match bool
for _, rMapping := range s.regexMappingsByTopic {
match = rMapping.r.MatchString(topicName)
if match {
mapping = rMapping.ProtoTopicMapping
break
}
}

if !match {
return mapping, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings")
}

return mapping, nil
}

// getMessageDescriptor tries to find the apr
func (s *Service) getMessageDescriptor(topicName string, property RecordPropertyType) (*desc.MessageDescriptor, error) {
// 1. Otherwise check if the user has configured a mapping to a local proto type for this topic and record type
mapping, exists := s.mappingsByTopic[topicName]
if !exists {
return nil, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings")
mapping, err := s.getMatchingMapping(topicName)
if err != nil {
return nil, err
}

protoTypeURL := ""
Expand Down
92 changes: 92 additions & 0 deletions backend/pkg/proto/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ package proto
import (
"bytes"
"encoding/binary"
"reflect"
"regexp"
"testing"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/stretchr/testify/assert"
)

Expand All @@ -35,3 +38,92 @@ func Test_decodeConfluentBinaryWrapper(t *testing.T) {
_, err := svc.decodeConfluentBinaryWrapper(buf.Bytes())
assert.Error(t, err)
}

var (
strictTopicMapping01 = config.ProtoTopicMapping{
TopicName: "strictTopicMapping01",
IsRegex: false,
}

strictTopicMapping02 = config.ProtoTopicMapping{
TopicName: "stritctTopicMapping02",
IsRegex: false,
}

strictTopicMapping03 = config.ProtoTopicMapping{
TopicName: "strictTopicMapping03",
IsRegex: false,
}

regexTopicMapping01 = config.ProtoTopicMapping{
TopicName: "TopicName",
IsRegex: true,
}
)

func TestService_getMatchingMapping(t *testing.T) {
type fields struct {
strictMappingsByTopic map[string]config.ProtoTopicMapping
regexMappingsByTopic map[string]RegexProtoTopicMapping
}
type args struct {
topicName string
}
tests := []struct {
name string
fields fields
args args
wantMapping config.ProtoTopicMapping
wantErr bool
}{
{
name: "get a strict mapping",
fields: fields{
strictMappingsByTopic: map[string]config.ProtoTopicMapping{
strictTopicMapping01.TopicName: strictTopicMapping01,
strictTopicMapping02.TopicName: strictTopicMapping02,
},
},
args: args{
topicName: strictTopicMapping01.TopicName,
},
wantMapping: strictTopicMapping01,
},
{
name: "get a regex mapping",
fields: fields{
strictMappingsByTopic: map[string]config.ProtoTopicMapping{
strictTopicMapping01.TopicName: strictTopicMapping01,
strictTopicMapping02.TopicName: strictTopicMapping02,
regexTopicMapping01.TopicName: regexTopicMapping01,
},
regexMappingsByTopic: map[string]RegexProtoTopicMapping{
regexTopicMapping01.TopicName: RegexProtoTopicMapping{
ProtoTopicMapping: regexTopicMapping01,
r: regexp.MustCompile(regexTopicMapping01.TopicName),
},
},
},
args: args{
topicName: "aTopicNameThatMustMatchRegex",
},
wantMapping: regexTopicMapping01,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Service{
strictMappingsByTopic: tt.fields.strictMappingsByTopic,
regexMappingsByTopic: tt.fields.regexMappingsByTopic,
}
gotMapping, err := s.getMatchingMapping(tt.args.topicName)
if (err != nil) != tt.wantErr {
t.Errorf("Service.getMatchingMapping() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotMapping, tt.wantMapping) {
t.Errorf("Service.getMatchingMapping() = %v, want %v", gotMapping, tt.wantMapping)
}
})
}
}

0 comments on commit 249f579

Please sign in to comment.