Skip to content

Commit

Permalink
Merge pull request linkedin#4 from songyiyang/master
Browse files Browse the repository at this point in the history
Updates from linkedin master and fixes for when worker not running
  • Loading branch information
jasonparekh committed Dec 18, 2015
2 parents 109a12e + 7b9181a commit d515bba
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 29 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Burrow is a monitoring companion for [Apache Kafka](http://kafka.apache.org) tha
* Multiple Kafka Cluster support
* Automatically monitors all consumers using Kafka-committed offsets
* Configurable support for Zookeeper-committed offsets
* Configurable support for Storm-committed offsets
* HTTP endpoint for consumer group status, as well as broker and consumer information
* Configurable emailer for sending alerts for specific groups
* Configurable HTTP client for sending alerts to another system for all groups
Expand Down
88 changes: 72 additions & 16 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ type BurrowConfig struct {
LockPath string `gcfg:"lock-path"`
}
Kafka map[string]*struct {
Brokers []string `gcfg:"broker"`
BrokerPort int `gcfg:"broker-port"`
Zookeepers []string `gcfg:"zookeeper"`
ZookeeperPort int `gcfg:"zookeeper-port"`
ZookeeperPath string `gcfg:"zookeeper-path"`
OffsetsTopic string `gcfg:"offsets-topic"`
ZKOffsets bool `gcfg:"zookeeper-offsets"`
Brokers []string `gcfg:"broker"`
BrokerPort int `gcfg:"broker-port"`
Zookeepers []string `gcfg:"zookeeper"`
ZookeeperPort int `gcfg:"zookeeper-port"`
ZookeeperPath string `gcfg:"zookeeper-path"`
OffsetsTopic string `gcfg:"offsets-topic"`
ZKOffsets bool `gcfg:"zookeeper-offsets"`
}
Storm map[string]*struct {
Zookeepers []string `gcfg:"zookeeper"`
ZookeeperPort int `gcfg:"zookeeper-port"`
ZookeeperPath string `gcfg:"zookeeper-path"`
}
Tickers struct {
BrokerOffsets int `gcfg:"broker-offsets"`
Expand All @@ -57,6 +62,8 @@ type BurrowConfig struct {
ExpireGroup int64 `gcfg:"expire-group"`
ZKCheck int64 `gcfg:"zookeeper-interval"`
ZKGroupRefresh int64 `gcfg:"zk-group-refresh"`
StormCheck int64 `gcfg:"storm-interval"`
StormGroupRefresh int64 `gcfg:"storm-group-refresh"`
}
Httpserver struct {
Enable bool `gcfg:"server"`
Expand Down Expand Up @@ -202,6 +209,30 @@ func ValidateConfig(app *ApplicationContext) error {
}
}

// Storm Clusters
if len(app.Config.Storm) > 0 {
for cluster, cfg := range app.Config.Storm {
if cfg.ZookeeperPort == 0 {
cfg.ZookeeperPort = 2181
}
if len(cfg.Zookeepers) == 0 {
errs = append(errs, fmt.Sprintf("No Zookeeper hosts specified for cluster %s", cluster))
} else {
hostlistError := checkHostlist(cfg.Zookeepers, cfg.ZookeeperPort, "Zookeeper")
if hostlistError != "" {
errs = append(errs, hostlistError)
}
}
if cfg.ZookeeperPath == "" {
errs = append(errs, fmt.Sprintf("Zookeeper path is not specified for cluster %s", cluster))
} else {
if !validateZookeeperPath(cfg.ZookeeperPath) {
errs = append(errs, fmt.Sprintf("Zookeeper path is not valid for cluster %s", cluster))
}
}
}
}

// Tickers
if app.Config.Tickers.BrokerOffsets == 0 {
app.Config.Tickers.BrokerOffsets = 60
Expand All @@ -217,12 +248,18 @@ func ValidateConfig(app *ApplicationContext) error {
if app.Config.Lagcheck.ZKCheck == 0 {
app.Config.Lagcheck.ZKCheck = 60
}
if app.Config.Lagcheck.StormCheck == 0 {
app.Config.Lagcheck.StormCheck = 60
}
if app.Config.Lagcheck.MinDistance == 0 {
app.Config.Lagcheck.MinDistance = 1
}
if app.Config.Lagcheck.ZKGroupRefresh == 0 {
app.Config.Lagcheck.ZKGroupRefresh = 300
}
if app.Config.Lagcheck.StormGroupRefresh == 0 {
app.Config.Lagcheck.StormGroupRefresh = 300
}

// HTTP Server
if app.Config.Httpserver.Enable {
Expand Down Expand Up @@ -400,20 +437,39 @@ func validateUrl(rawUrl string) bool {
func checkHostlist(hosts []string, defaultPort int, appName string) string {
for i, host := range hosts {
hostparts := strings.Split(host, ":")
if !validateHostname(hostparts[0]) {
return fmt.Sprintf("One or more %s hostnames are invalid", appName)
}
hostport := defaultPort
hostname := hostparts[0]

if len(hostparts) == 2 {
hostport, err := strconv.Atoi(hostparts[1])
if (err == nil) && (hostport > 0) {
hosts[i] = fmt.Sprintf("%s:%v", hostparts[0], hostport)
} else {
// Must be a hostname or IPv4 address with a port
var err error
hostport, err = strconv.Atoi(hostparts[1])
if (err != nil) || (hostport == 0) {
return fmt.Sprintf("One or more %s hostnames have invalid port components", appName)
}
} else {
hosts[i] = fmt.Sprintf("%s:%v", hostparts[0], defaultPort)
}

if len(hostparts) > 2 {
// Must be an IPv6 address
// Try without popping off the last segment as a port number first
if validateIP(host) {
hostname = host
} else {
// The full host didn't validate as an IP, so let's pull off the last piece as a port number and try again
hostname = strings.Join(hostparts[:len(hostparts)-1], ":")

hostport, err := strconv.Atoi(hostparts[len(hostparts)-1])
if (err != nil) || (hostport == 0) {
return fmt.Sprintf("One or more %s hostnames have invalid port components", appName)
}
}
}

if !validateHostname(hostname) {
return fmt.Sprintf("One or more %s hostnames are invalid", appName)
}

hosts[i] = fmt.Sprintf("[%s]:%v", hostname, hostport)
}

return ""
Expand Down
7 changes: 7 additions & 0 deletions config/burrow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ offsets-topic=__consumer_offsets
; (ysong) This has been changed to a boolean value, so we need to set offset to true
zookeeper-offsets=true

[storm "local"]
zookeeper=zkhost01.example.com
zookeeper=zkhost02.example.com
zookeeper=zkhost03.example.com
zookeeper-port=2181
zookeeper-path=/kafka-cluster/stormconsumers

[tickers]
broker-offsets=60

Expand Down
11 changes: 7 additions & 4 deletions http_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"bytes"
log "github.com/cihub/seelog"
"github.com/pborman/uuid"
"io"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -77,12 +79,11 @@ func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
groupIds: make(map[string]map[string]Event),
resultsChannel: make(chan *ConsumerGroupStatus),
httpClient: &http.Client{
Timeout: time.Duration(app.Config.Httpnotifier.Timeout) * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: time.Duration(app.Config.Httpnotifier.Timeout) * time.Second,
KeepAlive: time.Duration(app.Config.Httpnotifier.Keepalive) * time.Second,
}).Dial,
TLSHandshakeTimeout: time.Duration(app.Config.Httpnotifier.Timeout) * time.Second,
},
},
}, nil
Expand Down Expand Up @@ -151,7 +152,8 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
log.Errorf("Failed to send POST (Id %s): %v", notifier.groupIds[result.Cluster][result.Group].Id, err)
return
}
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Infof("Sent POST for group %s in cluster %s at severity %v (Id %s)", result.Group,
Expand Down Expand Up @@ -190,7 +192,8 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
log.Errorf("Failed to send DELETE: %v", err)
return
}
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Infof("Sent DELETE for group %s in cluster %s (Id %s)", result.Group, result.Cluster,
Expand Down
33 changes: 26 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ type KafkaCluster struct {
Zookeeper *ZookeeperClient
}

type StormCluster struct {
Storm *StormClient
}

type ApplicationContext struct {
Config *BurrowConfig
Storage *OffsetStorage
Clusters map[string]*KafkaCluster
Server *HttpServer
Emailer *Emailer
HttpNotifier *HttpNotifier
NotifierLock *zk.Lock
Config *BurrowConfig
Storage *OffsetStorage
Clusters map[string]*KafkaCluster
Storms map[string]*StormCluster
Server *HttpServer
Emailer *Emailer
HttpNotifier *HttpNotifier
NotifierLock *zk.Lock
}

func loadNotifiers(app *ApplicationContext) error {
Expand Down Expand Up @@ -176,6 +181,20 @@ func burrowMain() int {
appContext.Clusters[cluster] = &KafkaCluster{Client: client, Zookeeper: zkconn}
}

// Start Storm Clients for each storm cluster
appContext.Storms = make(map[string]*StormCluster, len(appContext.Config.Storm))
for cluster, _ := range appContext.Config.Storm {
log.Infof("Starting Storm client for cluster %s", cluster)
stormClient, err := NewStormClient(appContext, cluster)
if err != nil {
log.Criticalf("Cannot start Storm client for cluster %s: %v", cluster, err)
return 1
}
defer stormClient.Stop()

appContext.Storms[cluster] = &StormCluster{Storm: stormClient}
}

// Set up the Zookeeper lock for notification
appContext.NotifierLock = zk.NewLock(zkconn, appContext.Config.Zookeeper.LockPath, zk.WorldACL(zk.PermAll))

Expand Down
4 changes: 2 additions & 2 deletions offsets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {

// Get broker partition count and offset for this topic and partition first
storage.offsets[offset.Cluster].brokerLock.RLock()
if topic, ok := storage.offsets[offset.Cluster].broker[offset.Topic]; !ok || (ok && ((int32(len(topic)) <= offset.Partition) || (topic[offset.Partition] == nil))) {
if topic, ok := storage.offsets[offset.Cluster].broker[offset.Topic]; !ok || (ok && ((int32(len(topic)) < offset.Partition) || (topic[offset.Partition] == nil))) {
// If we don't have the partition or offset from the broker side yet, ignore the consumer offset for now
storage.offsets[offset.Cluster].brokerLock.RUnlock()
return
Expand Down Expand Up @@ -274,7 +274,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
} else {
// Prevent old offset commits, and new commits that are too fast (less than the min-distance config)
previousTimestamp := storage.offsets[offset.Cluster].consumer[offset.Group][offset.Topic][offset.Partition].Prev().Value.(*ConsumerOffset).Timestamp
if offset.Timestamp-previousTimestamp < (storage.app.Config.Lagcheck.MinDistance * 1000) {
if offset.Timestamp-previousTimestamp < (storage.app.Config.Lagcheck.MinDistance * 1000) && offset.Timestamp-previousTimestamp != 0{
storage.offsets[offset.Cluster].consumerLock.Unlock()
return
}
Expand Down
Loading

0 comments on commit d515bba

Please sign in to comment.