This repository has been archived by the owner on Oct 1, 2021. It is now read-only.
forked from aristanetworks/goarista
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
68 lines (61 loc) · 2.01 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Copyright (C) 2016 Arista Networks, Inc.
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.
// The occlient tool is a client for the gRPC service for getting and setting the
// OpenConfig configuration and state of a network device.
package main
import (
"flag"
"fmt"
"strings"
"sync"
"github.com/Shopify/sarama"
"github.com/aristanetworks/glog"
"github.com/aristanetworks/goarista/kafka"
"github.com/aristanetworks/goarista/kafka/openconfig"
"github.com/aristanetworks/goarista/kafka/producer"
pb "github.com/aristanetworks/goarista/openconfig"
"github.com/aristanetworks/goarista/openconfig/client"
)
var keysFlag = flag.String("kafkakeys", "",
"Keys for kafka messages (comma-separated, default: the value of -addrs")
func newProducer(addresses []string, topic, key string) (producer.Producer, error) {
client, err := kafka.NewClient(addresses)
if err != nil {
return nil, fmt.Errorf("Failed to create Kafka client: %s", err)
}
encodedKey := sarama.StringEncoder(key)
p, err := producer.New(topic, nil, client, encodedKey, openconfig.ElasticsearchMessageEncoder)
if err != nil {
return nil, fmt.Errorf("Failed to create Kafka producer: %s", err)
}
return p, nil
}
func main() {
username, password, subscriptions, grpcAddrs, opts := client.ParseFlags()
if *keysFlag == "" {
*keysFlag = strings.Join(grpcAddrs, ",")
}
keys := strings.Split(*keysFlag, ",")
if len(grpcAddrs) != len(keys) {
glog.Fatal("Please provide the same number of addresses and Kafka keys")
}
addresses := strings.Split(*kafka.Addresses, ",")
wg := new(sync.WaitGroup)
for i, grpcAddr := range grpcAddrs {
key := keys[i]
p, err := newProducer(addresses, *kafka.Topic, key)
if err != nil {
glog.Fatal(err)
} else {
glog.Infof("Initialized Kafka producer for %s", grpcAddr)
}
publish := func(notif *pb.SubscribeResponse) {
p.Write(notif)
}
wg.Add(1)
go p.Run()
go client.Run(publish, wg, username, password, grpcAddr, subscriptions, opts)
}
wg.Wait()
}