Skip to content

An aggregated records producer for Amazon Kinesis

License

Notifications You must be signed in to change notification settings

mitooos/kinesis-producer

 
 

Repository files navigation

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK V2 and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
	"log"
	"time"
	"context"

	"github.com/aws/aws-sdk-go-v2/service/kinesis"
	"github.com/aws/aws-sdk-go-v2/config"
	producer "github.com/mitooos/kinesis-producer"
)


func main() {
	cfg, err := config.LoadDefaultConfig(context.TODO(),
		config.WithRegion("us-west-2"),
	)
	if err != nil {
	// handle error
		log.Fatal(err)
	}

	client := kinesis.NewFromConfig(cfg)

	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client,
	})

	pr.Start()

	// Handle failures
	go func() {
		for r := range pr.NotifyFailures() {
			// r contains `Data`, `PartitionKey` and `Error()`
			log.Printf("detected put failure, %v", r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.Printf("error producing, %v", err)
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}

Using logrus

import (
	"github.com/sirupsen/logrus"
	producer "github.com/a8m/kinesis-producer"
	"github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger

License

MIT