package main import ( "context" "github.com/lovoo/goka" "github.com/lovoo/goka/codec" "golang.org/x/sync/errgroup" "log" "os" "os/signal" "syscall" ) var ( lookupGroup goka.Group = "mygroup" lookupSourceTopic goka.Stream = "input_stream" lookupTable goka.Table = "t_lookup" kafkaBrokers = []string{"192.168.99.100:19092"} ) func perfromJoin(ctx goka.Context, msg interface{}) { if tw := ctx.Lookup(lookupTable, ctx.Key()); tw != nil { println("Found Hit " + tw.(string)) } else { println("No hits found") } } func Run(ctx context.Context, brokers []string) func() error { return func() error { g := goka.DefineGroup(lookupGroup, goka.Input(lookupSourceTopic, new(codec.String), perfromJoin), goka.Lookup(lookupTable, new(codec.String)), ) p, err := goka.NewProcessor(brokers, g) if err != nil { return err } return p.Run(ctx) } } func runLookup() { ctx, cancel := context.WithCancel(context.Background()) grp, ctx := errgroup.WithContext(ctx) grp.Go(Run(ctx, kafkaBrokers)) // Wait for SIGINT/SIGTERM waiter := make(chan os.Signal, 1) signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM) select { case <-waiter: case <-ctx.Done(): } cancel() if err := grp.Wait(); err != nil { log.Println(err) } log.Println("done") } func main() { runLookup() }