Skip to content

Commit

Permalink
Reject offsets that are older than the group expiration time (#330)
Browse files Browse the repository at this point in the history
* Reject offsets that are older than the group expiration time

* Add a test for dropping old offsets
  • Loading branch information
toddpalino committed Jan 19, 2018
1 parent b6184ff commit cacf05e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 20 deletions.
5 changes: 5 additions & 0 deletions core/internal/storage/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ func (module *InMemoryStorage) addConsumerOffset(request *protocol.StorageReques
return
}

if request.Timestamp < ((time.Now().Unix() - module.expireGroup) * 1000) {
requestLogger.Debug("dropped", zap.String("reason", "old offset"))
return
}

if !module.acceptConsumerGroup(request.Group) {
requestLogger.Debug("dropped", zap.String("reason", "group not whitelisted"))
return
Expand Down
98 changes: 78 additions & 20 deletions core/internal/storage/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package storage

import (
"container/ring"
"sync"
"time"

"github.com/stretchr/testify/assert"
Expand All @@ -33,6 +35,7 @@ func fixtureModule(whitelist string) *InMemoryStorage {
viper.Reset()
viper.Set("storage.test.class-name", "inmemory")
viper.Set("storage.test.group-whitelist", whitelist)
viper.Set("storage.test.min-distance", 1)

return &module
}
Expand Down Expand Up @@ -282,13 +285,22 @@ func TestInMemoryStorage_addConsumerOffset(t *testing.T) {
}

func TestInMemoryStorage_addConsumerOffset_Whitelist(t *testing.T) {
module := startWithTestConsumerOffsets("whitelistedgroup", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("whitelistedgroup", startTime)

// All offsets for the test group should have been dropped
_, ok := module.offsets["testcluster"].consumer["testgroup"]
assert.False(t, ok, "Group testgroup created when not whitelisted")
}

func TestInMemoryStorage_addConsumerOffset_TooOld(t *testing.T) {
module := startWithTestConsumerOffsets("testgroup", 1000000)

// All offsets for the test group should have been dropped as they are too old
_, ok := module.offsets["testcluster"].consumer["testgroup"]
assert.False(t, ok, "Group testgroup created when offsets are too old")
}

type testset struct {
whitelist string
passGroups []string
Expand Down Expand Up @@ -330,7 +342,7 @@ func TestInMemoryStorage_addConsumerOffset_MinDistance(t *testing.T) {
Group: "testgroup",
Partition: 0,
Offset: 2000,
Timestamp: 190001,
Timestamp: startTime + 90001,
}
module.addConsumerOffset(&request, module.Log)

Expand Down Expand Up @@ -403,7 +415,8 @@ func TestInMemoryStorage_addConsumerOffset_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_addConsumerOwner(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetConsumerOwner,
Expand All @@ -424,7 +437,8 @@ func TestInMemoryStorage_addConsumerOwner(t *testing.T) {
}

func TestInMemoryStorage_deleteTopic(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteTopic,
Expand All @@ -441,7 +455,8 @@ func TestInMemoryStorage_deleteTopic(t *testing.T) {
}

func TestInMemoryStorage_deleteTopic_BadCluster(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteTopic,
Expand All @@ -456,7 +471,8 @@ func TestInMemoryStorage_deleteTopic_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_deleteTopic_NoTopic(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteTopic,
Expand All @@ -473,7 +489,8 @@ func TestInMemoryStorage_deleteTopic_NoTopic(t *testing.T) {
}

func TestInMemoryStorage_deleteGroup(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteGroup,
Expand All @@ -487,7 +504,8 @@ func TestInMemoryStorage_deleteGroup(t *testing.T) {
}

func TestInMemoryStorage_deleteGroup_BadCluster(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteGroup,
Expand All @@ -502,7 +520,8 @@ func TestInMemoryStorage_deleteGroup_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_deleteGroup_NoGroup(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteGroup,
Expand All @@ -516,7 +535,8 @@ func TestInMemoryStorage_deleteGroup_NoGroup(t *testing.T) {
}

func TestInMemoryStorage_fetchClusterList(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchClusters,
Expand All @@ -537,7 +557,8 @@ func TestInMemoryStorage_fetchClusterList(t *testing.T) {
}

func TestInMemoryStorage_fetchTopicList(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchTopics,
Expand All @@ -559,7 +580,8 @@ func TestInMemoryStorage_fetchTopicList(t *testing.T) {
}

func TestInMemoryStorage_fetchTopicList_BadCluster(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchTopics,
Expand All @@ -576,7 +598,8 @@ func TestInMemoryStorage_fetchTopicList_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_fetchConsumerList(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumers,
Expand All @@ -598,7 +621,8 @@ func TestInMemoryStorage_fetchConsumerList(t *testing.T) {
}

func TestInMemoryStorage_fetchConsumerList_BadCluster(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumers,
Expand All @@ -615,7 +639,8 @@ func TestInMemoryStorage_fetchConsumerList_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_fetchTopic(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchTopic,
Expand All @@ -638,7 +663,8 @@ func TestInMemoryStorage_fetchTopic(t *testing.T) {
}

func TestInMemoryStorage_fetchTopic_BadCluster(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchTopic,
Expand All @@ -656,7 +682,8 @@ func TestInMemoryStorage_fetchTopic_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_fetchTopic_BadTopic(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchTopic,
Expand Down Expand Up @@ -727,7 +754,8 @@ func TestInMemoryStorage_fetchConsumer(t *testing.T) {
}

func TestInMemoryStorage_fetchConsumer_BadCluster(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumer,
Expand All @@ -745,7 +773,8 @@ func TestInMemoryStorage_fetchConsumer_BadCluster(t *testing.T) {
}

func TestInMemoryStorage_fetchConsumer_BadGroup(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
startTime := (time.Now().Unix() * 1000) - 100000
module := startWithTestConsumerOffsets("", startTime)

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumer,
Expand All @@ -763,7 +792,36 @@ func TestInMemoryStorage_fetchConsumer_BadGroup(t *testing.T) {
}

func TestInMemoryStorage_fetchConsumer_Expired(t *testing.T) {
module := startWithTestConsumerOffsets("", 100000)
// We can't insert these offsets normally, so we need to mash them into the module
module := startWithTestBrokerOffsets("")

clusterMap := module.offsets["testcluster"]
clusterMap.consumerLock.Lock()
clusterMap.consumer["testgroup"] = &consumerGroup{
lock: &sync.RWMutex{},
topics: make(map[string][]*consumerPartition),
}
consumerMap := clusterMap.consumer["testgroup"]
clusterMap.consumerLock.Unlock()

consumerMap.lock.Lock()
consumerMap.topics["testtopic"] = []*consumerPartition{{offsets: ring.New(module.intervals)}}
consumerTopicMap := consumerMap.topics["testtopic"]
consumerPartitionRing := consumerTopicMap[0].offsets
consumerMap.lock.Unlock()

for i := 0; i < 10; i++ {
offset := uint64(1000 + (i * 100))
ts := 1000000 + int64(i*10000)

consumerPartitionRing.Value = &protocol.ConsumerOffset{
Offset: int64(offset),
Timestamp: ts,
Lag: 4321 - offset,
}
consumerMap.lastCommit = ts
consumerMap.topics["testtopic"][0].offsets = consumerMap.topics["testtopic"][0].offsets.Next()
}

request := protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumer,
Expand Down

0 comments on commit cacf05e

Please sign in to comment.