Skip to content

Commit

Permalink
Fix ZK watch problem (#328)
Browse files Browse the repository at this point in the history
* Fix ZK watch problem

In the kafka_zk consumer module, the watches for the ZK tree were getting set but not fired correctly. Changing the receive from channel fixed this, but it exposed a race condition with the offsets znode not appearing immediately.

* Fix concurrent map issue #315
  • Loading branch information
toddpalino committed Jan 19, 2018
1 parent 846d785 commit e47ec4c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 27 deletions.
126 changes: 100 additions & 26 deletions core/internal/consumer/kafka_zk_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type topicList struct {
topics map[string]*partitionCount
lock *sync.Mutex
lock *sync.RWMutex
}
type partitionCount struct {
count int32
Expand All @@ -53,7 +53,7 @@ type KafkaZkClient struct {
zk protocol.ZookeeperClient
areWatchesSet bool
running *sync.WaitGroup
groupLock *sync.Mutex
groupLock *sync.RWMutex
groupList map[string]*topicList
groupWhitelist *regexp.Regexp
groupBlacklist *regexp.Regexp
Expand All @@ -69,7 +69,7 @@ func (module *KafkaZkClient) Configure(name string, configRoot string) {

module.name = name
module.running = &sync.WaitGroup{}
module.groupLock = &sync.Mutex{}
module.groupLock = &sync.RWMutex{}
module.groupList = make(map[string]*topicList)
module.connectFunc = helpers.ZookeeperConnect

Expand Down Expand Up @@ -181,14 +181,51 @@ func (module *KafkaZkClient) acceptConsumerGroup(group string) bool {
return true
}

// This is a simple goroutine that will wait for an event on a watch channnel and then exit. It's here so that when
// we set a watch that we don't care about (from an ExistsW on a node that already exists), we can drain it properly.
func drainEventChannel(eventChan <-chan zk.Event) {
<-eventChan
}

func (module *KafkaZkClient) waitForNodeToExist(zkPath string, Logger *zap.Logger) bool {
nodeExists, _, existsWatchChan, err := module.zk.ExistsW(zkPath)
if err != nil {
// This is a real error (since NoNode will not return an error)
Logger.Debug("failed to check existence of znode",
zap.String("path", zkPath),
zap.String("error", err.Error()),
)
return false
}
if nodeExists {
// The node already exists, just drain the data watch that got created whenever it fires
go drainEventChannel(existsWatchChan)
return true
}

// Wait for the node to exist
Logger.Debug("waiting for node to exist", zap.String("path", zkPath))
event := <-existsWatchChan
if event.Type == zk.EventNotWatching {
// Watch is gone, so we're gone too
Logger.Debug("exists watch invalidated",
zap.String("path", zkPath),
)
return false
}
return true
}

func (module *KafkaZkClient) watchGroupList(eventChan <-chan zk.Event) {
defer module.running.Done()

event, isOpen := <-eventChan
if (!isOpen) || (event.Type == zk.EventNotWatching) {
event := <-eventChan
if event.Type == zk.EventNotWatching {
// We're done here
module.Log.Debug("group list watch invalidated")
return
}
module.Log.Debug("group list watch fired", zap.Int("event_type", int(event.Type)))
module.running.Add(1)
go module.resetGroupListWatchAndAdd(event.Type != zk.EventNodeChildrenChanged)
}
Expand Down Expand Up @@ -222,13 +259,13 @@ func (module *KafkaZkClient) resetGroupListWatchAndAdd(resetOnly bool) {
if module.groupList[group] == nil {
module.groupList[group] = &topicList{
topics: make(map[string]*partitionCount),
lock: &sync.Mutex{},
lock: &sync.RWMutex{},
}
module.Log.Debug("add group",
zap.String("group", group),
)
module.running.Add(1)
module.resetTopicListWatchAndAdd(group, false)
go module.resetTopicListWatchAndAdd(group, false)
}
}
}
Expand All @@ -237,33 +274,47 @@ func (module *KafkaZkClient) resetGroupListWatchAndAdd(resetOnly bool) {
func (module *KafkaZkClient) watchTopicList(group string, eventChan <-chan zk.Event) {
defer module.running.Done()

event, isOpen := <-eventChan
if (!isOpen) || (event.Type == zk.EventNotWatching) {
event := <-eventChan
if event.Type == zk.EventNotWatching {
// We're done here
module.Log.Debug("topic list watch invalidated", zap.String("group", group))
return
}
module.Log.Debug("topic list watch fired",
zap.String("group", group),
zap.Int("event_type", int(event.Type)),
)
module.running.Add(1)
go module.resetTopicListWatchAndAdd(group, event.Type != zk.EventNodeChildrenChanged)
}

func (module *KafkaZkClient) resetTopicListWatchAndAdd(group string, resetOnly bool) {
defer module.running.Done()

// Wait for the offsets znode for this group to exist. We need to do this because the previous child watch
// fires on /consumers/(group) existing, but here we try to read /consumers/(group)/offsets (which might not exist
// yet)
zkPath := module.zookeeperPath + "/" + group + "/offsets"
Logger := module.Log.With(zap.String("group", group))
if !module.waitForNodeToExist(zkPath, Logger) {
// There was an error checking node existence, so we can't continue
return
}

// Get the current group topic list and reset our watch
groupTopics, _, topicListEventChan, err := module.zk.ChildrenW(module.zookeeperPath + "/" + group + "/offsets")
groupTopics, _, topicListEventChan, err := module.zk.ChildrenW(zkPath)
if err != nil {
// Can't read the offsets path. usually this just means that this isn't an active ZK consumer
module.Log.Debug("failed to read topic list",
zap.String("group", group),
zap.String("error", err.Error()),
)
Logger.Debug("failed to read topic list", zap.String("error", err.Error()))
return
}
module.running.Add(1)
go module.watchTopicList(group, topicListEventChan)

if !resetOnly {
// Check for any new topics and create the watches for them
module.groupLock.RLock()
defer module.groupLock.RUnlock()

module.groupList[group].lock.Lock()
defer module.groupList[group].lock.Unlock()
for _, topic := range groupTopics {
Expand All @@ -272,12 +323,9 @@ func (module *KafkaZkClient) resetTopicListWatchAndAdd(group string, resetOnly b
count: 0,
lock: &sync.Mutex{},
}
module.Log.Debug("add topic",
zap.String("group", group),
zap.String("topic", topic),
)
Logger.Debug("add topic", zap.String("topic", topic))
module.running.Add(1)
module.resetPartitionListWatchAndAdd(group, topic, false)
go module.resetPartitionListWatchAndAdd(group, topic, false)
}
}
}
Expand All @@ -286,11 +334,20 @@ func (module *KafkaZkClient) resetTopicListWatchAndAdd(group string, resetOnly b
func (module *KafkaZkClient) watchPartitionList(group string, topic string, eventChan <-chan zk.Event) {
defer module.running.Done()

event, isOpen := <-eventChan
if (!isOpen) || (event.Type == zk.EventNotWatching) {
event := <-eventChan
if event.Type == zk.EventNotWatching {
// We're done here
module.Log.Debug("partition list watch invalidated",
zap.String("group", group),
zap.String("topic", topic),
)
return
}
module.Log.Debug("partition list watch fired",
zap.String("group", group),
zap.String("topic", topic),
zap.Int("event_type", int(event.Type)),
)
module.running.Add(1)
go module.resetPartitionListWatchAndAdd(group, topic, event.Type != zk.EventNodeChildrenChanged)
}
Expand All @@ -301,7 +358,7 @@ func (module *KafkaZkClient) resetPartitionListWatchAndAdd(group string, topic s
// Get the current topic partition list and reset our watch
topicPartitions, _, partitionListEventChan, err := module.zk.ChildrenW(module.zookeeperPath + "/" + group + "/offsets/" + topic)
if err != nil {
// Can't read the consumers path. Bail for now
// Can't read the partition list path. Bail for now
module.Log.Warn("failed to read partitions",
zap.String("group", group),
zap.String("topic", topic),
Expand All @@ -314,6 +371,12 @@ func (module *KafkaZkClient) resetPartitionListWatchAndAdd(group string, topic s

if !resetOnly {
// Check for any new partitions and create the watches for them
module.groupLock.RLock()
defer module.groupLock.RUnlock()

module.groupList[group].lock.RLock()
defer module.groupList[group].lock.RUnlock()

module.groupList[group].topics[topic].lock.Lock()
defer module.groupList[group].topics[topic].lock.Unlock()
if int32(len(topicPartitions)) >= module.groupList[group].topics[topic].count {
Expand All @@ -334,11 +397,22 @@ func (module *KafkaZkClient) resetPartitionListWatchAndAdd(group string, topic s
func (module *KafkaZkClient) watchOffset(group string, topic string, partition int32, eventChan <-chan zk.Event) {
defer module.running.Done()

event, isOpen := <-eventChan
if (!isOpen) || (event.Type == zk.EventNotWatching) {
event := <-eventChan
if event.Type == zk.EventNotWatching {
// We're done here
module.Log.Debug("offset watch invalidated",
zap.String("group", group),
zap.String("topic", topic),
zap.Int32("partition", partition),
)
return
}
module.Log.Debug("offset watch fired",
zap.String("group", group),
zap.String("topic", topic),
zap.Int32("partition", partition),
zap.Int("event_type", int(event.Type)),
)
module.running.Add(1)
go module.resetOffsetWatchAndSend(group, topic, partition, event.Type != zk.EventNodeDataChanged)
}
Expand All @@ -349,7 +423,7 @@ func (module *KafkaZkClient) resetOffsetWatchAndSend(group string, topic string,
// Get the current offset and reset our watch
offsetString, offsetStat, offsetEventChan, err := module.zk.GetW(module.zookeeperPath + "/" + group + "/offsets/" + topic + "/" + strconv.FormatInt(int64(partition), 10))
if err != nil {
// Can't read the partition ofset path. Bail for now
// Can't read the partition offset path. Bail for now
module.Log.Warn("failed to read offset",
zap.String("group", group),
zap.String("topic", topic),
Expand Down
18 changes: 17 additions & 1 deletion core/internal/consumer/kafka_zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestKafkaZkClient_Start(t *testing.T) {
watchEventChan := make(chan zk.Event)
mockZookeeper.On("ChildrenW", module.zookeeperPath).Return([]string{}, &zk.Stat{}, func() <-chan zk.Event { return watchEventChan }(), nil)
mockZookeeper.On("Close").Return().Run(func(args mock.Arguments) {
watchEventChan <- zk.Event{Type: zk.EventNotWatching}
close(watchEventChan)
})

Expand Down Expand Up @@ -102,11 +103,13 @@ func TestKafkaZkClient_watchGroupList(t *testing.T) {

offsetStat := &zk.Stat{Mtime: 894859}
newGroupChan := make(chan zk.Event)
topicExistsChan := make(chan zk.Event)
newTopicChan := make(chan zk.Event)
newPartitionChan := make(chan zk.Event)
newOffsetChan := make(chan zk.Event)
mockZookeeper.On("ChildrenW", "/consumers").Return([]string{"testgroup"}, offsetStat, func() <-chan zk.Event { return newGroupChan }(), nil)
mockZookeeper.On("ChildrenW", "/consumers/testgroup/offsets").Return([]string{"testtopic"}, offsetStat, func() <-chan zk.Event { return newTopicChan }(), nil)
mockZookeeper.On("ExistsW", "/consumers/testgroup/offsets").Return(true, offsetStat, func() <-chan zk.Event { return topicExistsChan }(), nil)
mockZookeeper.On("ChildrenW", "/consumers/testgroup/offsets/testtopic").Return([]string{"0"}, offsetStat, func() <-chan zk.Event { return newPartitionChan }(), nil)
mockZookeeper.On("GetW", "/consumers/testgroup/offsets/testtopic/0").Return([]byte("81234"), offsetStat, func() <-chan zk.Event { return newOffsetChan }(), nil)

Expand Down Expand Up @@ -140,6 +143,11 @@ func TestKafkaZkClient_watchGroupList(t *testing.T) {
State: zk.StateConnected,
Path: "/consumers/testgroup/offsets/shouldntgetcalled",
}
topicExistsChan <- zk.Event{
Type: zk.EventNotWatching,
State: zk.StateConnected,
Path: "/consumers/testgroup/offsets",
}
newPartitionChan <- zk.Event{
Type: zk.EventNotWatching,
State: zk.StateConnected,
Expand Down Expand Up @@ -213,12 +221,20 @@ func TestKafkaZkClient_resetPartitionListWatchAndAdd_BadPath(t *testing.T) {

func TestKafkaZkClient_resetTopicListWatchAndAdd_BadPath(t *testing.T) {
mockZookeeper := helpers.MockZookeeperClient{}
mockZookeeper.On("ChildrenW", "/consumers/testgroup/offsets").Return([]string{}, (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath"))
topicExistsChan := make(chan zk.Event)
mockZookeeper.On("ExistsW", "/consumers/testgroup/offsets").Return(false, (*zk.Stat)(nil), func() <-chan zk.Event { return topicExistsChan }(), nil)

module := fixtureKafkaZkModule()
module.Configure("test", "consumer.test")
module.zk = &mockZookeeper

go func() {
topicExistsChan <- zk.Event{
Type: zk.EventNotWatching,
State: zk.StateConnected,
Path: "/consumers/testgroup/offsets",
}
}()
module.running.Add(1)
module.resetTopicListWatchAndAdd("testgroup", false)
mockZookeeper.AssertExpectations(t)
Expand Down
13 changes: 13 additions & 0 deletions core/internal/helpers/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.E
return z.client.GetW(path)
}

// ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node
// (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a
// message when the watch fires
func (z *BurrowZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) {
return z.client.ExistsW(path)
}

// Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be provided
// to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is desired, specify
// zk.WorldACL(zk.PermAll)
Expand Down Expand Up @@ -108,6 +115,12 @@ func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Eve
return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3)
}

// ExistsW mocks protocol.ZookeeperClient.ExistsW
func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) {
args := m.Called(path)
return args.Bool(0), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3)
}

// Create mocks protocol.ZookeeperClient.Create
func (m *MockZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) {
args := m.Called(path, data, flags, acl)
Expand Down
5 changes: 5 additions & 0 deletions core/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ type ZookeeperClient interface {
// the children of the specified path, providing an event channel that will receive a message when the watch fires
GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets
// a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event
// channel that will receive a message when the watch fires
ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

// Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is\
// desired, specify
Expand Down

0 comments on commit e47ec4c

Please sign in to comment.