Skip to content

Commit

Permalink
fix: kafka create topic (#3102)
Browse files Browse the repository at this point in the history
* fix: kafka create topic

* chore: better errors messages
  • Loading branch information
fracasula committed Mar 15, 2023
1 parent 0c525d0 commit 10ccbf3
Showing 1 changed file with 45 additions and 3 deletions.
48 changes: 45 additions & 3 deletions services/streammanager/kafka/client/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package testutil
import (
"context"
"fmt"
"net"
"strconv"
"time"

"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -53,10 +55,47 @@ func (c *Client) ping(ctx context.Context) (*kafka.Conn, error) {
func (c *Client) CreateTopic(ctx context.Context, topic string, numPartitions, replicationFactor int) error {
conn, err := c.ping(ctx)
if err != nil {
return err
return fmt.Errorf("create topic: cannot ping: %w", err)
}

errors := make(chan error, 1)
var (
controllerHost string
errors = make(chan error, 1)
)
defer close(errors)
go func() { // doing it asynchronously because conn.Controller() does not honour the context
var b kafka.Broker
b, err = conn.Controller()
if err != nil {
errors <- fmt.Errorf("create topic: could not get controller: %w", err)
return
}
if b.Host == "" {
errors <- fmt.Errorf("create topic: controller connection has empty broker host")
return
}
controllerHost = net.JoinHostPort(b.Host, strconv.Itoa(b.Port))
errors <- nil
}()

select {
case <-ctx.Done():
return fmt.Errorf("create topic: %w", ctx.Err())
case err := <-errors:
if err != nil {
return err
}
}

controllerConn, err := c.dialer.DialContext(ctx, c.network, controllerHost)
if err != nil {
return fmt.Errorf("create topic: could not dial controller: %w", err)
}
defer func() {
// close asynchronously, if we block we might not respect the context
go func() { _ = controllerConn.Close() }()
}()

go func() { // doing it asynchronously because controllerConn.CreateTopics() does not honour the context
errors <- conn.CreateTopics(kafka.TopicConfig{
Topic: topic,
Expand All @@ -69,7 +108,10 @@ func (c *Client) CreateTopic(ctx context.Context, topic string, numPartitions, r
case <-ctx.Done():
return ctx.Err()
case err = <-errors:
return err
if err != nil {
return fmt.Errorf("create topic: could not create topic: %w", err)
}
return nil
}
}

Expand Down

0 comments on commit 10ccbf3

Please sign in to comment.