Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support regex in topic protobuf mappings #584

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/pkg/config/proto_topic_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package config
// if the topics have been serialized without the schema registry being involved in the
// serialization.
type ProtoTopicMapping struct {
// TopicName is the name of the topic to apply these proto files. This supports regex.
TopicName string `yaml:"topicName"`

// KeyProtoType is the proto's fully qualified name that shall be used for a Kafka record's key
Expand Down
14 changes: 12 additions & 2 deletions backend/pkg/connect/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,20 @@ type ClientWithConfig struct {
// NewService creates a new connect.Service. It tests the connectivity for each configured
// Kafka connect cluster proactively.
func NewService(cfg config.Connect, logger *zap.Logger) (*Service, error) {
logger.Info("creating Kafka connect HTTP clients and testing connectivity to all clusters")
clientsByCluster := make(map[string]*ClientWithConfig)

if len(cfg.Clusters) == 0 {
return &Service{
Cfg: cfg,
Logger: logger,
ClientsByCluster: clientsByCluster,
Interceptor: interceptor.NewInterceptor(),
}, nil
}

// 1. Create a client for each configured Connect cluster
clientsByCluster := make(map[string]*ClientWithConfig)
logger.Info("creating Kafka connect HTTP clients and testing connectivity to all clusters")

for _, clusterCfg := range cfg.Clusters {
Comment on lines -44 to 58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, I assume something went wrong when syncing your fork? Please fix this

// Create dedicated Connect HTTP Client for each cluster
childLogger := logger.With(
Expand Down
81 changes: 67 additions & 14 deletions backend/pkg/proto/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"encoding/binary"
"fmt"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,17 +48,23 @@ const (
RecordValue
)

type regexProtoTopicMapping struct {
config.ProtoTopicMapping
r *regexp.Regexp
}

// Service is in charge of deserializing protobuf encoded payloads. It supports payloads that were
// encoded with involvement of the schema registry as well as plain protobuf-encoded messages.
// This service is also in charge of reading the proto source files from the configured provider.
type Service struct {
cfg config.Proto
logger *zap.Logger

mappingsByTopic map[string]config.ProtoTopicMapping
gitSvc *git.Service
fsSvc *filesystem.Service
schemaSvc *schema.Service
strictMappingsByTopic map[string]config.ProtoTopicMapping
mappingsRegex []regexProtoTopicMapping
gitSvc *git.Service
fsSvc *filesystem.Service
schemaSvc *schema.Service

// fileDescriptorsBySchemaID are used to find the right schema type for messages at deserialization time. The type
// index is encoded as part of the serialized message.
Expand Down Expand Up @@ -112,25 +119,48 @@ func NewService(cfg config.Proto, logger *zap.Logger, schemaSvc *schema.Service)
}
}

mappingsByTopic := make(map[string]config.ProtoTopicMapping)
for _, mapping := range cfg.Mappings {
mappingsByTopic[mapping.TopicName] = mapping
strictMappingsByTopic, mappingsRegex, err := setMappingsByTopic(cfg.Mappings, logger)
if err != nil {
return nil, err
}

return &Service{
cfg: cfg,
logger: logger,

mappingsByTopic: mappingsByTopic,
gitSvc: gitSvc,
fsSvc: fsSvc,
schemaSvc: schemaSvc,
strictMappingsByTopic: strictMappingsByTopic,
mappingsRegex: mappingsRegex,
gitSvc: gitSvc,
fsSvc: fsSvc,
schemaSvc: schemaSvc,

// registry has to be created afterwards
registry: nil,
}, nil
}

func setMappingsByTopic(mappings []config.ProtoTopicMapping, logger *zap.Logger) (strictMappingsByTopic map[string]config.ProtoTopicMapping, mappingsRegex []regexProtoTopicMapping, err error) {
strictMappingsByTopic = make(map[string]config.ProtoTopicMapping)
mappingsRegex = make([]regexProtoTopicMapping, 0)

for _, mapping := range mappings {
r, err := regexp.Compile(mapping.TopicName)
if err != nil {
strictMappingsByTopic[mapping.TopicName] = mapping
logger.Warn("topic name handled as a strict match", zap.String("topic_name", mapping.TopicName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as of today where everyone use "strict matches" this would print one warning message for every single configured topic name. I think we need to change this. I agree that we want to be explicit about what's being used since the regexp compilation can fail.

I suggested something in my comment to overcome this issue

continue
}

mappingsRegex = append(mappingsRegex, regexProtoTopicMapping{
ProtoTopicMapping: mapping,
r: r,
})
continue
}

return strictMappingsByTopic, mappingsRegex, err
}

// Start polling the prototypes from the configured provider (e.g. filesystem or Git) and sync these
// into our in-memory prototype registry.
func (s *Service) Start() error {
Expand Down Expand Up @@ -308,12 +338,35 @@ func (s *Service) IsProtobufSchemaRegistryEnabled() bool {
return s.cfg.SchemaRegistry.Enabled
}

func (s *Service) getMatchingMapping(topicName string) (mapping config.ProtoTopicMapping, err error) {
mapping, strictExists := s.strictMappingsByTopic[topicName]
if strictExists {
return mapping, nil
}

var match bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, validate the complexity with a simple test.
It is convenient to extract the method that iterates by range s.regexMappingsByTopic.
Let's assume that the method will have the name findMappingByRegex(topicName string) (mapping config.ProtoTopicMapping, err error)

Test case

Call getMatchingMapping for each topic 3 times. Verify exactly one method call findMappingByRegex

Mappings:


      - topicName: "tb_core.0"
        isRegex: false
        valueProtoType: transport.ToCoreMsg
      - topicName: "tb_core.notifications.*"
        isRegex: true
        valueProtoType: transport.ToCoreNotificationMsg

Inputs (strict, regex, none-mapping):
tb_core.0
tb_core.notifications.tb-core-0
tb_rule_engine.main.9

Verify call getMatchingMapping (times), call findMappingByRegex (times):
3, 0 strict
3, 1 regex
3, 1 no match

for _, rMapping := range s.mappingsRegex {
match = rMapping.r.MatchString(topicName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a performance degradation for a non-matched topic.
Each time for non-matched topics, the algorithm iterate and match all known regex.

The maximum number of searches has to be estimated as total unique topic names O(N).

I suggest saving the nil result to the strict mapping s.strictMappingsByTopic[topicName] and checking on the nil output at the beginning of this function to avoid another loop searching by regex.

if match {
mapping = rMapping.ProtoTopicMapping
s.strictMappingsByTopic[topicName] = mapping
break
}
}

if !match {
return mapping, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings")
}

return mapping, nil
}

// GetMessageDescriptor tries to find the apr
func (s *Service) GetMessageDescriptor(topicName string, property RecordPropertyType) (*desc.MessageDescriptor, error) {
// 1. Otherwise check if the user has configured a mapping to a local proto type for this topic and record type
mapping, exists := s.mappingsByTopic[topicName]
if !exists {
return nil, fmt.Errorf("no prototype found for the given topic '%s'. Check your configured protobuf mappings", topicName)
mapping, err := s.getMatchingMapping(topicName)
if err != nil {
return nil, err
}

var protoTypeURL string
Expand Down
85 changes: 85 additions & 0 deletions backend/pkg/proto/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ package proto
import (
"bytes"
"encoding/binary"
"reflect"
"regexp"
"testing"

"github.com/stretchr/testify/assert"

"github.com/redpanda-data/console/backend/pkg/config"
)

func Test_decodeConfluentBinaryWrapper(t *testing.T) {
Expand All @@ -35,3 +39,84 @@ func Test_decodeConfluentBinaryWrapper(t *testing.T) {
_, err := svc.decodeConfluentBinaryWrapper(buf.Bytes())
assert.Error(t, err)
}

var (
strictTopicMapping01 = config.ProtoTopicMapping{
TopicName: "strictTopicMapping01",
}

strictTopicMapping02 = config.ProtoTopicMapping{
TopicName: "stritctTopicMapping02",
}

regexTopicMapping01 = config.ProtoTopicMapping{
TopicName: "TopicName",
}
)

func TestService_getMatchingMapping(t *testing.T) {
type fields struct {
strictMappingsByTopic map[string]config.ProtoTopicMapping
regexMappingsByTopic []regexProtoTopicMapping
}
type args struct {
topicName string
}
tests := []struct {
name string
fields fields
args args
wantMapping config.ProtoTopicMapping
wantErr bool
}{
{
name: "get a strict mapping",
fields: fields{
strictMappingsByTopic: map[string]config.ProtoTopicMapping{
strictTopicMapping01.TopicName: strictTopicMapping01,
strictTopicMapping02.TopicName: strictTopicMapping02,
},
},
args: args{
topicName: strictTopicMapping01.TopicName,
},
wantMapping: strictTopicMapping01,
},
{
name: "get a regex mapping",
fields: fields{
strictMappingsByTopic: map[string]config.ProtoTopicMapping{
strictTopicMapping01.TopicName: strictTopicMapping01,
strictTopicMapping02.TopicName: strictTopicMapping02,
regexTopicMapping01.TopicName: regexTopicMapping01,
},
regexMappingsByTopic: []regexProtoTopicMapping{
{
ProtoTopicMapping: regexTopicMapping01,
r: regexp.MustCompile(regexTopicMapping01.TopicName),
},
},
},
args: args{
topicName: "aTopicNameThatMustMatchRegex",
},
wantMapping: regexTopicMapping01,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Service{
strictMappingsByTopic: tt.fields.strictMappingsByTopic,
mappingsRegex: tt.fields.regexMappingsByTopic,
}
gotMapping, err := s.getMatchingMapping(tt.args.topicName)
if (err != nil) != tt.wantErr {
t.Errorf("Service.getMatchingMapping() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotMapping, tt.wantMapping) {
t.Errorf("Service.getMatchingMapping() = %v, want %v", gotMapping, tt.wantMapping)
}
})
}
}
Loading