diff --git a/backend/pkg/config/proto_topic_mapping.go b/backend/pkg/config/proto_topic_mapping.go index 07120892c..60ae3c7ce 100644 --- a/backend/pkg/config/proto_topic_mapping.go +++ b/backend/pkg/config/proto_topic_mapping.go @@ -14,8 +14,12 @@ package config // if the topics have been serialized without the schema registry being involved in the // serialization. type ProtoTopicMapping struct { + // TopicName is the name of the topic to apply these proto files. This supports regex. TopicName string `yaml:"topicName"` + // IsRegex indicates if the topicName must be handled as a regex and match multiple topic names + IsRegex bool `yaml:"isRegex"` + // KeyProtoType is the proto's fully qualified name that shall be used for a Kafka record's key KeyProtoType string `yaml:"keyProtoType"` diff --git a/backend/pkg/proto/service.go b/backend/pkg/proto/service.go index 0ea6f1ca3..7e2eb39ad 100644 --- a/backend/pkg/proto/service.go +++ b/backend/pkg/proto/service.go @@ -13,6 +13,7 @@ import ( "bytes" "encoding/binary" "fmt" + "regexp" "strings" "sync" @@ -42,6 +43,11 @@ const ( RecordValue ) +type RegexProtoTopicMapping struct { + config.ProtoTopicMapping + r *regexp.Regexp +} + // Service is in charge of deserializing protobuf encoded payloads. It supports payloads that were // encoded with involvement of the schema registry as well as plain protobuf-encoded messages. // This service is also in charge of reading the proto source files from the configured provider. @@ -49,10 +55,11 @@ type Service struct { cfg config.Proto logger *zap.Logger - mappingsByTopic map[string]config.ProtoTopicMapping - gitSvc *git.Service - fsSvc *filesystem.Service - schemaSvc *schema.Service + strictMappingsByTopic map[string]config.ProtoTopicMapping + regexMappingsByTopic map[string]RegexProtoTopicMapping + gitSvc *git.Service + fsSvc *filesystem.Service + schemaSvc *schema.Service // fileDescriptorsBySchemaID are used to find the right schema type for messages at deserialization time. The type // index is encoded as part of the serialized message. @@ -105,19 +112,34 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service) } } - mappingsByTopic := make(map[string]config.ProtoTopicMapping) + strictMappingsByTopic := make(map[string]config.ProtoTopicMapping) + regexMappingsByTopic := make(map[string]RegexProtoTopicMapping) + for _, mapping := range cfg.Mappings { - mappingsByTopic[mapping.TopicName] = mapping + if mapping.IsRegex { + r, err := regexp.Compile(mapping.TopicName) + if err != nil { + return nil, fmt.Errorf("invalid regexp as a topic name: %w", err) + } + + regexMappingsByTopic[mapping.TopicName] = RegexProtoTopicMapping{ + ProtoTopicMapping: mapping, + r: r, + } + continue + } + strictMappingsByTopic[mapping.TopicName] = mapping } return &Service{ cfg: cfg, logger: logger, - mappingsByTopic: mappingsByTopic, - gitSvc: gitSvc, - fsSvc: fsSvc, - schemaSvc: schemaSvc, + strictMappingsByTopic: strictMappingsByTopic, + regexMappingsByTopic: regexMappingsByTopic, + gitSvc: gitSvc, + fsSvc: fsSvc, + schemaSvc: schemaSvc, // registry has to be created afterwards registry: nil, @@ -251,12 +273,34 @@ func (s *Service) getMessageDescriptorFromConfluentMessage(wrapper *confluentEnv return msgType, wrapper.ProtoPayload, nil } +func (s *Service) getMatchingMapping(topicName string) (mapping config.ProtoTopicMapping, err error) { + mapping, strictExists := s.strictMappingsByTopic[topicName] + if strictExists { + return mapping, nil + } + + var match bool + for _, rMapping := range s.regexMappingsByTopic { + match = rMapping.r.MatchString(topicName) + if match { + mapping = rMapping.ProtoTopicMapping + break + } + } + + if !match { + return mapping, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings") + } + + return mapping, nil +} + // getMessageDescriptor tries to find the apr func (s *Service) getMessageDescriptor(topicName string, property RecordPropertyType) (*desc.MessageDescriptor, error) { // 1. Otherwise check if the user has configured a mapping to a local proto type for this topic and record type - mapping, exists := s.mappingsByTopic[topicName] - if !exists { - return nil, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings") + mapping, err := s.getMatchingMapping(topicName) + if err != nil { + return nil, err } protoTypeURL := "" diff --git a/backend/pkg/proto/service_test.go b/backend/pkg/proto/service_test.go index 2f21a974f..ed3961e0c 100644 --- a/backend/pkg/proto/service_test.go +++ b/backend/pkg/proto/service_test.go @@ -12,8 +12,11 @@ package proto import ( "bytes" "encoding/binary" + "reflect" + "regexp" "testing" + "github.com/redpanda-data/console/backend/pkg/config" "github.com/stretchr/testify/assert" ) @@ -35,3 +38,92 @@ func Test_decodeConfluentBinaryWrapper(t *testing.T) { _, err := svc.decodeConfluentBinaryWrapper(buf.Bytes()) assert.Error(t, err) } + +var ( + strictTopicMapping01 = config.ProtoTopicMapping{ + TopicName: "strictTopicMapping01", + IsRegex: false, + } + + strictTopicMapping02 = config.ProtoTopicMapping{ + TopicName: "stritctTopicMapping02", + IsRegex: false, + } + + strictTopicMapping03 = config.ProtoTopicMapping{ + TopicName: "strictTopicMapping03", + IsRegex: false, + } + + regexTopicMapping01 = config.ProtoTopicMapping{ + TopicName: "TopicName", + IsRegex: true, + } +) + +func TestService_getMatchingMapping(t *testing.T) { + type fields struct { + strictMappingsByTopic map[string]config.ProtoTopicMapping + regexMappingsByTopic map[string]RegexProtoTopicMapping + } + type args struct { + topicName string + } + tests := []struct { + name string + fields fields + args args + wantMapping config.ProtoTopicMapping + wantErr bool + }{ + { + name: "get a strict mapping", + fields: fields{ + strictMappingsByTopic: map[string]config.ProtoTopicMapping{ + strictTopicMapping01.TopicName: strictTopicMapping01, + strictTopicMapping02.TopicName: strictTopicMapping02, + }, + }, + args: args{ + topicName: strictTopicMapping01.TopicName, + }, + wantMapping: strictTopicMapping01, + }, + { + name: "get a regex mapping", + fields: fields{ + strictMappingsByTopic: map[string]config.ProtoTopicMapping{ + strictTopicMapping01.TopicName: strictTopicMapping01, + strictTopicMapping02.TopicName: strictTopicMapping02, + regexTopicMapping01.TopicName: regexTopicMapping01, + }, + regexMappingsByTopic: map[string]RegexProtoTopicMapping{ + regexTopicMapping01.TopicName: RegexProtoTopicMapping{ + ProtoTopicMapping: regexTopicMapping01, + r: regexp.MustCompile(regexTopicMapping01.TopicName), + }, + }, + }, + args: args{ + topicName: "aTopicNameThatMustMatchRegex", + }, + wantMapping: regexTopicMapping01, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &Service{ + strictMappingsByTopic: tt.fields.strictMappingsByTopic, + regexMappingsByTopic: tt.fields.regexMappingsByTopic, + } + gotMapping, err := s.getMatchingMapping(tt.args.topicName) + if (err != nil) != tt.wantErr { + t.Errorf("Service.getMatchingMapping() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotMapping, tt.wantMapping) { + t.Errorf("Service.getMatchingMapping() = %v, want %v", gotMapping, tt.wantMapping) + } + }) + } +}