Skip to content

Commit

Permalink
feat: support initialisation of producer without topic for kafka, azu…
Browse files Browse the repository at this point in the history
…re event hub and confluent cloud (#2569)

Co-authored-by: Krishna Chaitanya <chaithu.kitti@gmail.com>
  • Loading branch information
utsabc and krishna2020 committed Nov 7, 2022
1 parent ef95eba commit 0312c55
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 60 deletions.
29 changes: 18 additions & 11 deletions services/streammanager/kafka/client/client_test.go
Expand Up @@ -126,7 +126,7 @@ func TestProducerBatchConsumerGroup(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(t.Name(), producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, p, noOfMessages)
messagesWaitGroup.Add(noOfMessages)
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestConsumer_Partition(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(t.Name(), producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, p, noOfMessages)
messagesWaitGroup.Add(noOfMessages)
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestWithSASL(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer("some-topic", producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -418,6 +418,7 @@ func TestWithSASL(t *testing.T) {
err := p.Publish(context.Background(), Message{
Key: []byte("hello"),
Value: []byte("ciao"),
Topic: "some-topic",
})
if err != nil {
t.Logf("Publish error: %v", err)
Expand Down Expand Up @@ -520,7 +521,7 @@ func TestProducer_Timeout(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(t.Name(), producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -534,6 +535,7 @@ func TestProducer_Timeout(t *testing.T) {
err = p.Publish(pubCtx, Message{
Key: []byte("hello"),
Value: []byte("world"),
Topic: t.Name(),
})
pubCancel()
require.NoError(t, err)
Expand All @@ -542,6 +544,7 @@ func TestProducer_Timeout(t *testing.T) {
err = p.Publish(pubCtx, Message{
Key: []byte("hello"),
Value: []byte("world"),
Topic: t.Name(),
})
defer pubCancel()
require.Error(t, err)
Expand Down Expand Up @@ -588,7 +591,7 @@ func TestIsProducerErrTemporary(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer("non-existent-topic", producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -604,9 +607,11 @@ func TestIsProducerErrTemporary(t *testing.T) {
err = p.Publish(pubCtx, Message{
Key: []byte("key-01"),
Value: []byte("value-01"),
Topic: "non-existent-topic",
}, Message{
Key: []byte("key-02"),
Value: []byte("value-02"),
Topic: "non-existent-topic",
})
require.Truef(t, IsProducerErrTemporary(err), "Expected temporary error, got %v instead", err)
pubCancel()
Expand Down Expand Up @@ -645,13 +650,13 @@ func TestConfluentAzureCloud(t *testing.T) {
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(
t.Name(), // the topic needs to be created beforehand via the ConfluentCloud admin panel
producerConf,
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01")})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// the topic needs to be created beforehand via the ConfluentCloud admin panel
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01"), Topic: t.Name()})
cancel()
require.NoError(t, err)

Expand Down Expand Up @@ -689,11 +694,12 @@ func TestAzureEventHubsCloud(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(azureEventHubName, producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01")})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01"), Topic: azureEventHubName})

cancel()
require.NoError(t, err)

Expand Down Expand Up @@ -722,6 +728,7 @@ func publishMessages(ctx context.Context, t *testing.T, p *Producer, noOfMessage
messages[i] = Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
Topic: t.Name(),
}
}

Expand Down
29 changes: 18 additions & 11 deletions services/streammanager/kafka/client/producer.go
Expand Up @@ -36,7 +36,7 @@ type Producer struct {
}

// NewProducer instantiates a new producer. To use it asynchronously just do "go p.Publish(ctx, msgs)".
func (c *Client) NewProducer(topic string, producerConf ProducerConfig) (p *Producer, err error) { // skipcq: CRT-P0003
func (c *Client) NewProducer(producerConf ProducerConfig) (p *Producer, err error) { // skipcq: CRT-P0003
producerConf.defaults()

dialer := &net.Dialer{
Expand Down Expand Up @@ -68,7 +68,6 @@ func (c *Client) NewProducer(topic string, producerConf ProducerConfig) (p *Prod
config: producerConf,
writer: &kafka.Writer{
Addr: kafka.TCP(c.addresses...),
Topic: topic,
Balancer: &kafka.ReferenceHash{},
BatchTimeout: time.Nanosecond,
WriteTimeout: producerConf.WriteTimeout,
Expand Down Expand Up @@ -109,17 +108,12 @@ func (p *Producer) Close(ctx context.Context) error {
func (p *Producer) Publish(ctx context.Context, msgs ...Message) error {
messages := make([]kafka.Message, len(msgs))
for i := range msgs {
var headers []kafka.Header
if l := len(msgs[i].Headers); l > 0 {
headers = make([]kafka.Header, l)
for k := range msgs[i].Headers {
headers[k] = kafka.Header{
Key: msgs[i].Headers[k].Key,
Value: msgs[i].Headers[k].Value,
}
}
if msgs[i].Topic == "" {
return fmt.Errorf("no topic provided for message %d", i)
}
headers := headers(msgs[i])
messages[i] = kafka.Message{
Topic: msgs[i].Topic,
Key: msgs[i].Key,
Value: msgs[i].Value,
Time: msgs[i].Timestamp,
Expand All @@ -132,6 +126,19 @@ func (p *Producer) Publish(ctx context.Context, msgs ...Message) error {

var tempError interface{ Temporary() bool }

func headers(msg Message) (headers []kafka.Header) {
if l := len(msg.Headers); l > 0 {
headers = make([]kafka.Header, l)
for k := range msg.Headers {
headers[k] = kafka.Header{
Key: msg.Headers[k].Key,
Value: msg.Headers[k].Value,
}
}
}
return headers
}

func isErrTemporary(err error) bool {
isTransientNetworkError := errors.Is(err, io.ErrUnexpectedEOF) ||
errors.Is(err, syscall.ECONNREFUSED) ||
Expand Down
37 changes: 25 additions & 12 deletions services/streammanager/kafka/kafkamanager.go
Expand Up @@ -312,7 +312,7 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*Produ
return nil, fmt.Errorf("could not ping: %w", err)
}

p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{
p, err := c.NewProducer(client.ProducerConfig{
ReadTimeout: kafkaReadTimeout,
WriteTimeout: kafkaWriteTimeout,
})
Expand Down Expand Up @@ -363,7 +363,7 @@ func NewProducerForAzureEventHubs(destination *backendconfig.DestinationT, o com
return nil, fmt.Errorf("[Azure Event Hubs] Cannot connect: %w", err)
}

p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{
p, err := c.NewProducer(client.ProducerConfig{
ReadTimeout: kafkaReadTimeout,
WriteTimeout: kafkaWriteTimeout,
})
Expand Down Expand Up @@ -415,7 +415,7 @@ func NewProducerForConfluentCloud(destination *backendconfig.DestinationT, o com
return nil, fmt.Errorf("[Confluent Cloud] Cannot connect: %w", err)
}

p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{
p, err := c.NewProducer(client.ProducerConfig{
ReadTimeout: kafkaReadTimeout,
WriteTimeout: kafkaWriteTimeout,
})
Expand Down Expand Up @@ -450,14 +450,19 @@ func serializeAvroMessage(value []byte, codec goavro.Codec) ([]byte, error) {
return binary, nil
}

func prepareBatchOfMessages(topic string, batch []map[string]interface{}, timestamp time.Time, p producerManager) (
func prepareBatchOfMessages(batch []map[string]interface{}, timestamp time.Time, p producerManager, defaultTopic string) (
[]client.Message, error,
) {
start := now()
defer func() { kafkaStats.prepareBatchTime.SendTiming(since(start)) }()

var messages []client.Message

for i, data := range batch {
topic, ok := data["topic"].(string)
if !ok || topic == "" {
topic = defaultTopic
}

message, ok := data["message"]
if !ok {
kafkaStats.missingMessage.Increment()
Expand Down Expand Up @@ -533,7 +538,6 @@ func (p *ProducerManager) Produce(jsonData json.RawMessage, destConfig interface
// return 400 if producer is invalid
return 400, "Could not create producer", "Could not create producer"
}

start := now()
defer func() { kafkaStats.produceTime.SendTiming(since(start)) }()

Expand All @@ -556,18 +560,19 @@ func (p *ProducerManager) Produce(jsonData json.RawMessage, destConfig interface
if kafkaBatchingEnabled {
return sendBatchedMessage(ctx, jsonData, p, conf.Topic)
}

return sendMessage(ctx, jsonData, p, conf.Topic)
}

func sendBatchedMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, topic string) (int, string, string) {
func sendBatchedMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, defaultTopic string) (int, string, string) {
var batch []map[string]interface{}
err := json.Unmarshal(jsonData, &batch)
if err != nil {
return 400, "Failure", "Error while unmarshalling json data: " + err.Error()
}

timestamp := time.Now()
batchOfMessages, err := prepareBatchOfMessages(topic, batch, timestamp, p)
batchOfMessages, err := prepareBatchOfMessages(batch, timestamp, p, defaultTopic)
if err != nil {
return 400, "Failure", "Error while preparing batched message: " + err.Error()
}
Expand All @@ -581,7 +586,7 @@ func sendBatchedMessage(ctx context.Context, jsonData json.RawMessage, p produce
return 200, returnMessage, returnMessage
}

func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, topic string) (int, string, string) {
func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, defaultTopic string) (int, string, string) {
parsedJSON := gjson.ParseBytes(jsonData)
messageValue := parsedJSON.Get("message").Value()
if messageValue == nil {
Expand All @@ -594,11 +599,11 @@ func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManage
}

timestamp := time.Now()
userID, _ := parsedJSON.Get("userId").Value().(string)
userID := parsedJSON.Get("userId").String()
codecs := p.getCodecs()
if len(codecs) > 0 {
schemaId, _ := parsedJSON.Get("schemaId").Value().(string)
messageId, _ := parsedJSON.Get("message.messageId").Value().(string)
schemaId := parsedJSON.Get("schemaId").String()
messageId := parsedJSON.Get("message.messageId").String()
if schemaId == "" {
return makeErrorResponse(fmt.Errorf("schemaId is not available for event with messageId: %s", messageId))
}
Expand All @@ -611,7 +616,15 @@ func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManage
return makeErrorResponse(fmt.Errorf("unable to serialize event with messageId: %s, with error %s", messageId, err))
}
}

topic := parsedJSON.Get("topic").String()

if topic == "" {
topic = defaultTopic
}

message := prepareMessage(topic, userID, value, timestamp)

if err = publish(ctx, p, message); err != nil {
return makeErrorResponse(fmt.Errorf("could not publish to %q: %w", topic, err))
}
Expand Down

0 comments on commit 0312c55

Please sign in to comment.