Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
Clone or download
db7 Merge pull request #165 from lovoo/bugfix/tester-terminate
terminate tester when one of the queue consumers died
Latest commit b2139e3 Jan 9, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
codec move codec to main package and remove key Apr 24, 2017
examples fixes wrong commit for queue tracker Nov 5, 2018
kafka add ensure config to check topic Oct 30, 2018
logger logger interface and options Jun 8, 2017
mock ensure topic exists as more flexible topic creation option Oct 26, 2018
multierr introduce errgroup in processor Mar 19, 2018
storage Fix passing opts to LevelDB in BuilderWithOptions Aug 3, 2018
tester terminate tester when one of the queue consumers died Dec 13, 2018
web remove proto and go-metrics direct dependencies Oct 31, 2018
.gitignore Initial commit Mar 28, 2017
.travis.yml trying with go 1.9 Mar 22, 2018
LICENSE Initial commit Mar 28, 2017
README.md adapt examples to use Run(context) Apr 30, 2018
codec.go move codec to main package and remove key Apr 24, 2017
context.go add Context() method to goka.Context Dec 13, 2018
context_test.go if context panics, finish context with error May 22, 2018
doc.go web-components regenerated, refresh-interval reduced Dec 1, 2017
emitter.go add emitter to tester Oct 26, 2018
errors.go mostly complying with gometalinter Mar 21, 2018
graph.go tester implementation refactored Oct 26, 2018
graph_test.go implement inputs for multiple topics sharing codec/callback in the gr… May 4, 2017
iterator.go mostly complying with gometalinter Mar 21, 2018
iterator_test.go fixing view iterator Nov 2, 2017
once.go removing useless funcOnce Mar 20, 2018
once_test.go add restartable view support Mar 13, 2018
options.go add emitter to tester Oct 26, 2018
options_test.go tester implementation refactored Oct 26, 2018
partition.go fix repeated rebalance issue Oct 25, 2018
partition_test.go fix processor tests Oct 26, 2018
processor.go add Context() method to goka.Context Dec 13, 2018
processor_integration_test.go fix integration tests Dec 10, 2018
processor_test.go add Context() method to goka.Context Dec 13, 2018
proxy.go mostly complying with gometalinter Mar 21, 2018
proxy_test.go mostly complying with gometalinter Mar 21, 2018
stats.go mostly complying with gometalinter Mar 21, 2018
view.go tester implementation refactored Oct 26, 2018
view_test.go remove dead code Oct 31, 2018

README.md

Goka License Build Status GoDoc

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka. A more detailed introduction of the project can be found in this blog post.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// emits a single message and leave
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	err = emitter.EmitSync("some-key", "some-value")
	if err != nil {
		log.Fatalf("error emitting message: %v", err)
	}
	fmt.Println("message emitted")
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	runEmitter()   // emits one message and stops
	runProcessor() // press ctrl-c to stop
}

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.