Skip to content

Commit

Permalink
Fix an incorrect cast from #338 and add a test to cover it (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
toddpalino authored Jan 30, 2018
1 parent 389ef47 commit a91cf4d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/internal/storage/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (module *InMemoryStorage) addBrokerOffset(request *protocol.StorageRequest,
Timestamp: request.Timestamp,
}
} else {
ringval, _ := partitionEntry.Value.(*protocol.ConsumerOffset)
ringval, _ := partitionEntry.Value.(*brokerOffset)
ringval.Offset = request.Offset
ringval.Timestamp = request.Timestamp
}
Expand Down
34 changes: 34 additions & 0 deletions core/internal/storage/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,40 @@ func TestInMemoryStorage_addBrokerOffset_ExistingPartition(t *testing.T) {
assert.Equalf(t, int64(9876), previousOffset.Timestamp, "Expected timestamp for p0 to be 9876, got %v", previousOffset.Timestamp)
}

func TestInMemoryStorage_addBrokerOffset_AddMany(t *testing.T) {
module := startWithTestBrokerOffsets("")

// Add a lot of offsets
request := protocol.StorageRequest{
RequestType: protocol.StorageSetBrokerOffset,
Cluster: "testcluster",
Topic: "testtopic",
Partition: 0,
TopicPartitionCount: 1,
Offset: 4321,
Timestamp: 9876,
}
for i := 0; i < 100; i++ {
request.Offset = request.Offset + 1
request.Timestamp = request.Timestamp + 1
module.addBrokerOffset(&request, module.Log)
}

topicList := module.offsets["testcluster"].broker["testtopic"]
numOffsets := topicList[0].Len()
ringPtr := topicList[0]
for i := numOffsets - 1; i >= 0; i-- {
ringPtr = ringPtr.Next()
assert.NotNilf(t, ringPtr.Value, "Offset %v: brokerOffset object not created", i)
val := ringPtr.Value.(*brokerOffset)
expectedOffset := request.Offset - int64(i)
expectedTimestamp := request.Timestamp - int64(i)

assert.Equalf(t, expectedOffset, val.Offset, "Offset %v: Expected offset to be %v, got %v", i, expectedOffset, val.Offset)
assert.Equalf(t, expectedTimestamp, val.Timestamp, "Offset %v: Expected timestamp to be %v, got %v", i, expectedTimestamp, val.Timestamp)
}
}

func TestInMemoryStorage_addBrokerOffset_BadCluster(t *testing.T) {
module := startWithTestCluster("")
request := protocol.StorageRequest{
Expand Down

0 comments on commit a91cf4d

Please sign in to comment.