Skip to content

qntfy/frinesis

Repository files navigation

frinesis

Travis Build Status Coverage Status MIT licensed GitHub release Go Report Card GoDoc

An AWS Kinesis implementation of a Frizzle Sink.

In addition to the AWS Kinesis SDK for Go, Frinesis uses a modified version of sendgridlabs/go-kinesis/batchproducer (under separate MIT license).

Frizzle is a magic message (Msg) bus designed for parallel processing w many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Prereqs / Build instructions

Go mod

As of Go 1.11, frinesis uses go mod for dependency management.

Running the tests

Frinesis has integration tests which require a kinesis endpoint to test against. KINESIS_ENDPOINT environment variable is used by tests. We test with a localstack instance (docker-compose.yml provided) but other tools like kinesalite could also work.

$ docker-compose up -d
# takes a few seconds to initialize; can use a tool like wait-for-it.sh in scripting
$ export KINESIS_ENDPOINT=localhost:4568
$ go test -v --cover ./...

Configuration

Frinesis Sinks are configured using Viper.

func InitSink(config *viper.Viper) (*Sink, error)

InitSinkWithLogger(config *viper.Viper, logger *zap.Logger)

We typically initialize Viper through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix before the below values.

Variable Required Description Default
AWS_REGION_NAME required region being used e.g. us-east-1
KINESIS_ENDPOINT optional if using a custom endpoint e.g. for local testing. Defaults to AWS standard internal and retrieving credentials from IAM if not set. http:// prefixed if no scheme set
KINESIS_FLUSH_TIMEOUT sink (optional) how long to wait for Kinesis Sink to flush remaining messages on close (use duration) 30s

Async Error Handling

Since records are sent in batch fashion, Kinesis may report errors asynchronously. Errors can be recovered via channel returned by the Sink.Events() method. In addition to the String() method required by frizzle, currently only errors are returned by frinesis (no other event types) so all Events recovered will also conform to error interface.

Contributing

Contributions welcome! Take a look at open issues.