Queue Streamer is a Go package that processes and transfers data between Kafka topics with exactly-once delivery guarantees. This package receives messages from Kafka brokers and transfers them to specified topics. This document explains how to install and use Queue Streamer.
To install Queue Streamer, use the Go modules:
go get github.com/violetpay-org/queue-streamer
Here is an example code to use Queue Streamer.
package main
import (
"github.com/violetpay-org/queue-streamer"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
brokers := []string{"localhost:9092"}
origin := qstreamer.Topic("origin-topic", 3) // Topic name and partition
// Serializer that converts the message to the message to be produced.
// In this case, the message is not converted, so it is a pass-through serializer.
serializer := qstreamer.NewPassThroughSerializer()
destination1 := qstreamer.Topic("destination-topic-1", 5) // Topic name and partition
streamer := qstreamer.NewTopicStreamer(brokers, origin)
cfg := qstreamer.NewStreamConfig(serializer, destination1)
streamer.AddConfig(cfg)
streamer.Run() // Non-blocking
defer streamer.Stop()
wg.Add(1)
wg.Wait()
}
-
Set Topics: Use the
NewTopic()
to set the start and end topics. -
Use PassThroughSerializer: Create a pass-through serializer using
NewPassThroughSerializer()
which does not manufacture the message.- If you want to convert the message, you can create a custom serializer that implements the Serializer interface.
-
Set StreamConfig: Use the
NewStreamConfig()
to configure the stream settings. -
Create and Configure TopicStreamer: Use the
NewTopicStreamer()
to create the topic streamer and theAddConfig()
method to add the stream configuration. -
Run and Stop Streamer: Call the
Run()
method to start the streamer and theStop()
method to stop the streamer.
Contributions are welcome! You can contribute to the project by reporting bugs, requesting features, and submitting pull requests.
Queue Streamer is distributed under the MIT License. See the LICENSE file for more details.