Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rdkafka + Go + (macOS?) receive failed #1828

Closed
aviramha opened this issue Aug 22, 2023 · 0 comments · Fixed by #1829
Closed

rdkafka + Go + (macOS?) receive failed #1828

aviramha opened this issue Aug 22, 2023 · 0 comments · Fixed by #1829
Labels
bug Something isn't working

Comments

@aviramha
Copy link
Member

Bug Description

When connecting to kafka in Go using confluent kafka we're seeing

%3|1692687932.300|FAIL|rdkafka#consumer-1| [thrd:kafka-service:9092/bootstrap]: kafka-service:9092/bootstrap: Receive failed: Undefined error: 0 (after 0ms in state UP, 5 identical error(s) suppressed)
Consumer error: kafka-service:9092/bootstrap: Receive failed: Undefined error: 0 (after 0ms in state UP, 5 identical error(s) suppressed) (<nil>)
Consumer error: 1/1 brokers are down (<nil>)

Steps to Reproduce

Clone https://github.com/metalbear-co/playground
Change the following files to use confluent kafka:

ip-visit-consumer/main.go

package main

import (
	"context"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"github.com/gin-gonic/gin"
	"github.com/spf13/viper"
	"net/http"
	"time"
)

var ctx = context.Background()

// Config
// Struct that holds local service port, remote redis host and port
type Config struct {
	Port               int16
	KafkaAddress       string
	KafkaTopic         string
	KafkaConsumerGroup string
}

type IpMessage struct {
	Ip string `json:"ip"`
}

func loadConfig() Config {
	viper.BindEnv("port")
	viper.BindEnv("kafkaaddress")
	viper.BindEnv("kafkatopic")
	viper.BindEnv("kafkaconsumergroup")

	config := Config{}
	config.Port = int16(viper.GetInt("port"))
	config.KafkaAddress = viper.GetString("kafkaaddress")
	config.KafkaTopic = viper.GetString("kafkatopic")
	config.KafkaConsumerGroup = viper.GetString("kafkaconsumergroup")

	return config
}

func StartKafkaReader(address, topic, group string) {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": address,
		"group.id":          group,
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{topic}, nil)

	// A signal handler or similar could be used to set this to false to break the loop.
	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()

}

func main() {
	config := loadConfig()

	go StartKafkaReader(config.KafkaAddress, config.KafkaTopic, config.KafkaConsumerGroup)

	router := gin.Default()
	router.GET("/health", func(ctx *gin.Context) { ctx.Status(http.StatusOK) })
	fmt.Print("loaded")
	router.Run("0.0.0.0:" + fmt.Sprint(config.Port))
}

add go go.mod

	github.com/confluentinc/confluent-kafka-go/v2 v2.2.0

then run go mod tidy
debug the app using GoLand

Backtrace

No response

Relevant Logs

No response

Your operating system and version

macOS m1

Local process

go binary

Local process version

No response

Additional Info

No response

@aviramha aviramha added the bug Something isn't working label Aug 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant