Collection of middleware, wrappers and other tools for substrate.
Is a message source wrapper that allows the user to acknowledge messages in any order and it will ensure messages are sent to the actual message source in the same order they are consumed.
Is an async message source wrapper that allows the user to utilise a handler pattern for interacting with an async message source. It removes the need to manually handle the message and acknowledgement channels.
See for example usage.
Provides wrappers for both message source and message sink that add prometheus metrics labeled with topic and status (either success or error).
Is a message source wrapper that wraps any number of sources. It consumes messages from all of them and passes them on to the user. It ensures that the acknowledgements are passed to the correct source.
Is a message flushing wrapper which blocks until all produced messages have been acked by the user. In the scenario that the user performs an action only after a message has been produced, the flushing wrapper provides a guarantee that such an action is only performed on a successful sink.
import (
sink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
Brokers: []string{"localhost:9092"},
Topic: "example-topic",
Version: "2.0.1",
if err != nil {
sink = instrumented.NewAsyncMessageSink(sink, prometheus.CounterOpts{
Name: "messages_produced_total",
Help: "The total count of messages produced",
}, "topic")
flushSink := flush.NewAsyncMessageSink(context.Background(), sink)
defer func() {
// Flush will block until all messages produced using `PublishMessage` have been acked
// by the AckFunc, if provided.
err := flushSink.Flush()
if err != nil {
err = flushSink.Close()
if err != nil {
go func() {
err = flushSink.Run()
if err != nil {
messages := []string{
"message one",
"message two",
var wg sync.WaitGroup
for _, msg := range messages {
go func(msg string) {
defer wg.Done()
// This context is used to control the publishing of the message. This ctx
// could, and probably should, be different to the lifecyle context passed
// into the constructor.
err := flushSink.PublishMessage(context.Background(), []byte(msg))
if err != nil {
sink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
Brokers: []string{"localhost:9092"},
Topic: "example-topic",
Version: "2.0.1",
if err != nil {
sink = instrumented.NewAsyncMessageSink(sink, prometheus.CounterOpts{
Name: "messages_produced_total",
Help: "The total count of messages produced",
}, "topic")
ackFn := flush.WithAckFunc(func(msg substrate.Message) error {
return nil
flushSink := flush.NewAsyncMessageSink(context.Background(), sink,
ackFn, flush.WithMsgBufferSize(50), flush.WithAckBufferSize(100))
defer func() {
// Flush will block until all messages produced using `PublishMessage` have been acked
// by the AckFunc, if provided.
err := flushSink.Flush()
if err != nil {
err = flushSink.Close()
if err != nil {
go func() {
// Run blocks until the sink is closed or an error occurs. If the AckFn retruns an error
// it will be returned by `Run`.
err = flushSink.Run()
if err != nil {
messages := []string{
"message one",
"message two",
var wg sync.WaitGroup
for _, msg := range messages {
go func(msg string) {
defer wg.Done()
// This context is used to control the publishing of the message. This ctx
// could, and probably should, be different to the lifecyle context passed
// into the constructor.
err := flushSink.PublishMessage(context.Background(), []byte(msg))
if err != nil {
Provides a simple implementation of the substrate.Message
Provides a mock message source that can be used in testing as is done in this repo.