diff --git a/backend/pkg/config/proto_topic_mapping.go b/backend/pkg/config/proto_topic_mapping.go index 07120892c..8720bffea 100644 --- a/backend/pkg/config/proto_topic_mapping.go +++ b/backend/pkg/config/proto_topic_mapping.go @@ -14,6 +14,7 @@ package config // if the topics have been serialized without the schema registry being involved in the // serialization. type ProtoTopicMapping struct { + // TopicName is the name of the topic to apply these proto files. This supports regex. TopicName string `yaml:"topicName"` // KeyProtoType is the proto's fully qualified name that shall be used for a Kafka record's key diff --git a/backend/pkg/connect/service.go b/backend/pkg/connect/service.go index 7529a3898..a062b5be3 100644 --- a/backend/pkg/connect/service.go +++ b/backend/pkg/connect/service.go @@ -41,10 +41,20 @@ type ClientWithConfig struct { // NewService creates a new connect.Service. It tests the connectivity for each configured // Kafka connect cluster proactively. func NewService(cfg config.Connect, logger *zap.Logger) (*Service, error) { - logger.Info("creating Kafka connect HTTP clients and testing connectivity to all clusters") + clientsByCluster := make(map[string]*ClientWithConfig) + + if len(cfg.Clusters) == 0 { + return &Service{ + Cfg: cfg, + Logger: logger, + ClientsByCluster: clientsByCluster, + Interceptor: interceptor.NewInterceptor(), + }, nil + } // 1. Create a client for each configured Connect cluster - clientsByCluster := make(map[string]*ClientWithConfig) + logger.Info("creating Kafka connect HTTP clients and testing connectivity to all clusters") + for _, clusterCfg := range cfg.Clusters { // Create dedicated Connect HTTP Client for each cluster childLogger := logger.With( diff --git a/backend/pkg/proto/service.go b/backend/pkg/proto/service.go index 7d7ef85c1..90394e589 100644 --- a/backend/pkg/proto/service.go +++ b/backend/pkg/proto/service.go @@ -14,6 +14,7 @@ import ( "context" "encoding/binary" "fmt" + "regexp" "strings" "sync" "time" @@ -47,6 +48,11 @@ const ( RecordValue ) +type regexProtoTopicMapping struct { + config.ProtoTopicMapping + r *regexp.Regexp +} + // Service is in charge of deserializing protobuf encoded payloads. It supports payloads that were // encoded with involvement of the schema registry as well as plain protobuf-encoded messages. // This service is also in charge of reading the proto source files from the configured provider. @@ -54,10 +60,11 @@ type Service struct { cfg config.Proto logger *zap.Logger - mappingsByTopic map[string]config.ProtoTopicMapping - gitSvc *git.Service - fsSvc *filesystem.Service - schemaSvc *schema.Service + strictMappingsByTopic map[string]config.ProtoTopicMapping + mappingsRegex []regexProtoTopicMapping + gitSvc *git.Service + fsSvc *filesystem.Service + schemaSvc *schema.Service // fileDescriptorsBySchemaID are used to find the right schema type for messages at deserialization time. The type // index is encoded as part of the serialized message. @@ -112,25 +119,48 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service) } } - mappingsByTopic := make(map[string]config.ProtoTopicMapping) - for _, mapping := range cfg.Mappings { - mappingsByTopic[mapping.TopicName] = mapping + strictMappingsByTopic, mappingsRegex, err := setMappingsByTopic(cfg.Mappings, logger) + if err != nil { + return nil, err } return &Service{ cfg: cfg, logger: logger, - mappingsByTopic: mappingsByTopic, - gitSvc: gitSvc, - fsSvc: fsSvc, - schemaSvc: schemaSvc, + strictMappingsByTopic: strictMappingsByTopic, + mappingsRegex: mappingsRegex, + gitSvc: gitSvc, + fsSvc: fsSvc, + schemaSvc: schemaSvc, // registry has to be created afterwards registry: nil, }, nil } +func setMappingsByTopic(mappings []config.ProtoTopicMapping, logger *zap.Logger) (strictMappingsByTopic map[string]config.ProtoTopicMapping, mappingsRegex []regexProtoTopicMapping, err error) { + strictMappingsByTopic = make(map[string]config.ProtoTopicMapping) + mappingsRegex = make([]regexProtoTopicMapping, 0) + + for _, mapping := range mappings { + r, err := regexp.Compile(mapping.TopicName) + if err != nil { + strictMappingsByTopic[mapping.TopicName] = mapping + logger.Warn("topic name handled as a strict match", zap.String("topic_name", mapping.TopicName)) + continue + } + + mappingsRegex = append(mappingsRegex, regexProtoTopicMapping{ + ProtoTopicMapping: mapping, + r: r, + }) + continue + } + + return strictMappingsByTopic, mappingsRegex, err +} + // Start polling the prototypes from the configured provider (e.g. filesystem or Git) and sync these // into our in-memory prototype registry. func (s *Service) Start() error { @@ -308,12 +338,35 @@ func (s *Service) IsProtobufSchemaRegistryEnabled() bool { return s.cfg.SchemaRegistry.Enabled } +func (s *Service) getMatchingMapping(topicName string) (mapping config.ProtoTopicMapping, err error) { + mapping, strictExists := s.strictMappingsByTopic[topicName] + if strictExists { + return mapping, nil + } + + var match bool + for _, rMapping := range s.mappingsRegex { + match = rMapping.r.MatchString(topicName) + if match { + mapping = rMapping.ProtoTopicMapping + s.strictMappingsByTopic[topicName] = mapping + break + } + } + + if !match { + return mapping, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings") + } + + return mapping, nil +} + // GetMessageDescriptor tries to find the apr func (s *Service) GetMessageDescriptor(topicName string, property RecordPropertyType) (*desc.MessageDescriptor, error) { // 1. Otherwise check if the user has configured a mapping to a local proto type for this topic and record type - mapping, exists := s.mappingsByTopic[topicName] - if !exists { - return nil, fmt.Errorf("no prototype found for the given topic '%s'. Check your configured protobuf mappings", topicName) + mapping, err := s.getMatchingMapping(topicName) + if err != nil { + return nil, err } var protoTypeURL string diff --git a/backend/pkg/proto/service_test.go b/backend/pkg/proto/service_test.go index 2f21a974f..dcce84c2a 100644 --- a/backend/pkg/proto/service_test.go +++ b/backend/pkg/proto/service_test.go @@ -12,9 +12,13 @@ package proto import ( "bytes" "encoding/binary" + "reflect" + "regexp" "testing" "github.com/stretchr/testify/assert" + + "github.com/redpanda-data/console/backend/pkg/config" ) func Test_decodeConfluentBinaryWrapper(t *testing.T) { @@ -35,3 +39,84 @@ func Test_decodeConfluentBinaryWrapper(t *testing.T) { _, err := svc.decodeConfluentBinaryWrapper(buf.Bytes()) assert.Error(t, err) } + +var ( + strictTopicMapping01 = config.ProtoTopicMapping{ + TopicName: "strictTopicMapping01", + } + + strictTopicMapping02 = config.ProtoTopicMapping{ + TopicName: "stritctTopicMapping02", + } + + regexTopicMapping01 = config.ProtoTopicMapping{ + TopicName: "TopicName", + } +) + +func TestService_getMatchingMapping(t *testing.T) { + type fields struct { + strictMappingsByTopic map[string]config.ProtoTopicMapping + regexMappingsByTopic []regexProtoTopicMapping + } + type args struct { + topicName string + } + tests := []struct { + name string + fields fields + args args + wantMapping config.ProtoTopicMapping + wantErr bool + }{ + { + name: "get a strict mapping", + fields: fields{ + strictMappingsByTopic: map[string]config.ProtoTopicMapping{ + strictTopicMapping01.TopicName: strictTopicMapping01, + strictTopicMapping02.TopicName: strictTopicMapping02, + }, + }, + args: args{ + topicName: strictTopicMapping01.TopicName, + }, + wantMapping: strictTopicMapping01, + }, + { + name: "get a regex mapping", + fields: fields{ + strictMappingsByTopic: map[string]config.ProtoTopicMapping{ + strictTopicMapping01.TopicName: strictTopicMapping01, + strictTopicMapping02.TopicName: strictTopicMapping02, + regexTopicMapping01.TopicName: regexTopicMapping01, + }, + regexMappingsByTopic: []regexProtoTopicMapping{ + { + ProtoTopicMapping: regexTopicMapping01, + r: regexp.MustCompile(regexTopicMapping01.TopicName), + }, + }, + }, + args: args{ + topicName: "aTopicNameThatMustMatchRegex", + }, + wantMapping: regexTopicMapping01, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &Service{ + strictMappingsByTopic: tt.fields.strictMappingsByTopic, + mappingsRegex: tt.fields.regexMappingsByTopic, + } + gotMapping, err := s.getMatchingMapping(tt.args.topicName) + if (err != nil) != tt.wantErr { + t.Errorf("Service.getMatchingMapping() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotMapping, tt.wantMapping) { + t.Errorf("Service.getMatchingMapping() = %v, want %v", gotMapping, tt.wantMapping) + } + }) + } +}