Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into cg-dest-uuid
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Oct 18, 2017
2 parents 619de1a + 3343bb9 commit 4c71fb1
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 77 deletions.
1 change: 0 additions & 1 deletion cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ func SetAdminCommands(commands *[]cli.Command) {
"cherami-storehost", "adminStatus", if set to anything other than "enabled", will prevent placement of new extent on this store
"cherami-storehost", "minFreeDiskSpaceBytes", integer, minimum required free disk space in bytes to place a new extent
"cherami-outputhost", "messagecachesize", comma-separated list of "destination/CG_name=value" for message cache size
"cherami-outputhost", "redeliveryIntervalInMs", the message redelivery interval in milliseconds
`,
Subcommands: []cli.Command{
{
Expand Down
3 changes: 0 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,6 @@ const (
OutputhostCGMessageSentAck
// OutputhostCGMessageSentNAck records the count of nack messages
OutputhostCGMessageSentNAck
// OutputhostCGMessageTimeout records the count of timeout messages
OutputhostCGMessageTimeout
// OutputhostCGMessagesThrottled records the count of messages throttled
OutputhostCGMessagesThrottled
// OutputhostCGAckMgrSeqNotFound is the gauge to track acks whose seq number is not found
Expand Down Expand Up @@ -1361,7 +1359,6 @@ var dynamicMetricDefs = map[ServiceIdx]map[int]metricDefinition{
OutputhostCGMessageRedelivered: {Counter, "outputhost.message.redelivered.cg"},
OutputhostCGMessageSentAck: {Counter, "outputhost.message.sent-ack.cg"},
OutputhostCGMessageSentNAck: {Counter, "outputhost.message.sent-nack.cg"},
OutputhostCGMessageTimeout: {Counter, "outputhost.message.timeout.cg"},
OutputhostCGMessagesThrottled: {Counter, "outputhost.message.throttled"},
OutputhostCGAckMgrSeqNotFound: {Counter, "outputhost.ackmgr.seq-not-found.cg"},
OutputhostCGReplicaReconnect: {Counter, "outputhost.replica-reconnect.cg"},
Expand Down
11 changes: 0 additions & 11 deletions services/outputhost/cgcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,17 +669,6 @@ func (cgCache *consumerGroupCache) getMessageCacheSize(cfg OutputCgConfig, oldSi
return cacheSize
}

//
func (cgCache *consumerGroupCache) getRedeliveryInterval(cfg OutputCgConfig, oldInterval int32) (interval int32) {
logFn := func() bark.Logger {
return cgCache.logger
}
ruleKey := cgCache.destPath + `/` + cgCache.cachedCGDesc.GetConsumerGroupName()
interval = int32(common.OverrideValueByPrefix(logFn, ruleKey, cfg.RedeliveryIntervalInMs, int64(oldInterval), `redeliveryIntervalInMs`))

return interval
}

// loadConsumerGroupCache loads everything on this cache including the extents and within the cache
func (cgCache *consumerGroupCache) loadConsumerGroupCache(ctx thrift.Context, exists bool) error {
cgCache.extMutex.Lock()
Expand Down
4 changes: 0 additions & 4 deletions services/outputhost/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ type (
// with different size config as follows:
// "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100"
MessageCacheSize []string `name:"messagecachesize" default:"/=10000"`

// RedeliveryIntervalInMs is used to configure the per CG message redelivery interval.
// Just like MessageCacheSize above
RedeliveryIntervalInMs []string `name:"redeliveryIntervalInMs" default:"/=100"`
}
)

Expand Down
25 changes: 13 additions & 12 deletions services/outputhost/kafkaAddresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,24 @@ const (
kafkaAddresserMaxAddress = kafkaAddresserMaxTP*kafkaAddresserDivisor + kafkaAddresserMaxOffset
)

// topicPartition
type topicPartition struct {
// TopicPartition represents a Kafka topic/partition pair
type TopicPartition struct {
Topic string
Partition int32
}

type kafkaTopicPartitionAddresser struct {
// KafkaTopicPartitionAddresser translates topic/partition/offset to/from store address
type KafkaTopicPartitionAddresser struct {
sync.RWMutex
tp2i map[topicPartition]int64
i2tp map[int64]*topicPartition
tp2i map[TopicPartition]int64
i2tp map[int64]*TopicPartition
nextTP int64
}

var kafkaAddresser = kafkaTopicPartitionAddresser{}
var kafkaAddresser = KafkaTopicPartitionAddresser{}

// GetStoreAddress converts a given topic, partition, and offset into a store address
func (k *kafkaTopicPartitionAddresser) GetStoreAddress(tp *topicPartition, offset int64, logFn func() bark.Logger) (address storeHostAddress) {
func (k *KafkaTopicPartitionAddresser) GetStoreAddress(tp *TopicPartition, offset int64, logFn func() bark.Logger) (address storeHostAddress) {
calc := func(tpInt int64, offset int64) storeHostAddress {
return storeHostAddress((tpInt * kafkaAddresserDivisor) + offset)
}
Expand Down Expand Up @@ -101,8 +102,8 @@ func (k *kafkaTopicPartitionAddresser) GetStoreAddress(tp *topicPartition, offse

// Check initialization
if k.nextTP == 0 {
k.tp2i = make(map[topicPartition]int64)
k.i2tp = make(map[int64]*topicPartition)
k.tp2i = make(map[TopicPartition]int64)
k.i2tp = make(map[int64]*TopicPartition)
}

// Assignment
Expand All @@ -126,8 +127,8 @@ func (k *kafkaTopicPartitionAddresser) GetStoreAddress(tp *topicPartition, offse
}

// GetTopicPartitionOffset recovers the original topic, partition, and offset from a store address
func (k *kafkaTopicPartitionAddresser) GetTopicPartitionOffset(address storeHostAddress, logFn func() bark.Logger) (tp *topicPartition, offset int64) {
returnOffset := func(tp *topicPartition, address storeHostAddress) (tpOut *topicPartition, offset int64) {
func (k *KafkaTopicPartitionAddresser) GetTopicPartitionOffset(address storeHostAddress, logFn func() bark.Logger) (tp *TopicPartition, offset int64) {
returnOffset := func(tp *TopicPartition, address storeHostAddress) (tpOut *TopicPartition, offset int64) {
return tp, int64(address) % kafkaAddresserDivisor
}

Expand All @@ -143,7 +144,7 @@ func (k *kafkaTopicPartitionAddresser) GetTopicPartitionOffset(address storeHost
k.RUnlock()

tpInt := recoverTP(address)
_, offset = returnOffset(&topicPartition{}, address)
_, offset = returnOffset(&TopicPartition{}, address)

logFn().WithFields(bark.Fields{
`module`: `kafkaAddresser`,
Expand Down
11 changes: 6 additions & 5 deletions services/outputhost/kafkaStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMes
kafkaConverter: kafkaMessageConverter,
}
if k.kafkaConverter == nil {
k.kafkaConverter = k.getDefaultKafkaMessageConverter()
k.kafkaConverter = GetDefaultKafkaMessageConverter(&k.seqNo, k.logger)
}
return k
}

func (k *kafkaStream) getDefaultKafkaMessageConverter() KafkaMessageConverter {
// GetDefaultKafkaMessageConverter returns the default kafka message converter
func GetDefaultKafkaMessageConverter(seqNo *int64, logger bark.Logger) KafkaMessageConverter {
return func(m *s.ConsumerMessage) (c *store.ReadMessageContent) {
c = &store.ReadMessageContent{
Type: store.ReadMessageContentTypePtr(store.ReadMessageContentType_MESSAGE),
Expand All @@ -128,13 +129,13 @@ func (k *kafkaStream) getDefaultKafkaMessageConverter() KafkaMessageConverter {
c.Message = &store.ReadMessage{
Address: common.Int64Ptr(
int64(kafkaAddresser.GetStoreAddress(
&topicPartition{
&TopicPartition{
Topic: m.Topic,
Partition: m.Partition,
},
m.Offset,
func() bark.Logger {
return k.logger.WithFields(bark.Fields{
return logger.WithFields(bark.Fields{
`module`: `kafkaStream`,
`topic`: m.Topic,
`partition`: m.Partition,
Expand All @@ -144,7 +145,7 @@ func (k *kafkaStream) getDefaultKafkaMessageConverter() KafkaMessageConverter {
}

c.Message.Message = &store.AppendMessage{
SequenceNumber: common.Int64Ptr(atomic.AddInt64(&k.seqNo, 1)),
SequenceNumber: common.Int64Ptr(atomic.AddInt64(seqNo, 1)),
}

if !m.Timestamp.IsZero() {
Expand Down
37 changes: 8 additions & 29 deletions services/outputhost/messagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type AckID string
const defaultLockTimeoutInSeconds int32 = 42
const defaultMaxDeliveryCount int32 = 2
const blockCheckingTimeout time.Duration = time.Minute
const defaultRedeliveryIntervalInMs = 100
const redeliveryInterval = time.Second / 2

// SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled
// Note that Google allows something like this: gbailey+smartRetryDisable@uber.com
Expand Down Expand Up @@ -261,12 +261,11 @@ type cgMsgCache struct {
notifier Notifier // this notifier is used to slow down cons connections based on NACKs
consumerHealth
messageCacheHealth
creditNotifyCh chan<- int32 // this is the notify ch to notify credits to extents
creditRequestCh <-chan string // read-only channel used by the extents to request credits specifically for that extent.
maxOutstandingMsgs int32 // max allowed outstanding messages
numAcks int32 // num acks we received
cgCache *consumerGroupCache // just a reference to the cgCache to grant credits to a local extent directly
redeliveryIntervalInMs int32 // redelivery ticker interval
creditNotifyCh chan<- int32 // this is the notify ch to notify credits to extents
creditRequestCh <-chan string // read-only channel used by the extents to request credits specifically for that extent.
maxOutstandingMsgs int32 // max allowed outstanding messages
numAcks int32 // num acks we received
cgCache *consumerGroupCache // just a reference to the cgCache to grant credits to a local extent directly
shared.ConsumerGroupDescription
}

Expand Down Expand Up @@ -384,7 +383,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {
msgCache.startTimer(eventTimer)

var redeliveries int64
var timeouts int64

now := common.Now()

Expand All @@ -403,7 +401,7 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {
msgCache.reinjectlastAckMsg() // injects the last acked message if it is not already injected
}

for i, cache := range timerCaches {
for _, cache := range timerCaches {

nExpired := 0

Expand Down Expand Up @@ -441,10 +439,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {
redeliveries++
msgCache.consumerHealth.lastRedeliveryTime = now // don't use variable 'now', since processing may be very slow in degenerate cases; consider moving to utilHandleRedeliveredMsg

if i == 0 { // means the message is from the redeliveryTimerCache, where the timeout messages are
timeouts++
}

if !stalled && cm.dlqInhibit > 0 {
cm.dlqInhibit-- // Use up one of the 'extra lives', but only if the consumer group seems healthy
}
Expand Down Expand Up @@ -476,11 +470,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {

} // for timercaches

// Report timeout metrics
if timeouts > 0 {
msgCache.consumerM3Client.AddCounter(metrics.ConsConnectionScope, metrics.OutputhostCGMessageTimeout, timeouts)
}

// Report redelivery metrics; consider moving to utilHandleRedeliveredMsg
if redeliveries > 0 {
msgCache.m3Client.AddCounter(metrics.ConsConnectionStreamScope, metrics.OutputhostMessageRedelivered, redeliveries)
Expand Down Expand Up @@ -990,21 +979,12 @@ func (msgCache *cgMsgCache) updateConn(connID int, event msgEvent) {

func (msgCache *cgMsgCache) refreshCgConfig(oldOutstandingMessages int32) {
outstandingMsgs := oldOutstandingMessages
redeliveryIntervalInMs := msgCache.redeliveryIntervalInMs

cfg, err := msgCache.cgCache.getDynamicCgConfig()
if err == nil {
outstandingMsgs = msgCache.cgCache.getMessageCacheSize(cfg, oldOutstandingMessages)
redeliveryIntervalInMs = msgCache.cgCache.getRedeliveryInterval(cfg, msgCache.redeliveryIntervalInMs)
}

msgCache.maxOutstandingMsgs = outstandingMsgs

if redeliveryIntervalInMs != msgCache.redeliveryIntervalInMs {
msgCache.redeliveryTicker.Stop()
msgCache.redeliveryTicker = time.NewTicker(time.Duration(redeliveryIntervalInMs) * time.Millisecond)
msgCache.redeliveryIntervalInMs = redeliveryIntervalInMs
}
}

// TODO: Make the delivery cache shared among all consumer groups
Expand Down Expand Up @@ -1032,15 +1012,14 @@ func newMessageDeliveryCache(
ackMsgCh: cgCache.ackMsgCh,
nackMsgCh: cgCache.nackMsgCh,
closeChannel: make(chan struct{}),
redeliveryTicker: time.NewTicker(defaultRedeliveryIntervalInMs * time.Millisecond),
redeliveryTicker: time.NewTicker(redeliveryInterval),
ConsumerGroupDescription: cgCache.cachedCGDesc,
consumerM3Client: cgCache.consumerM3Client,
m3Client: cgCache.m3Client,
notifier: cgCache.notifier,
creditNotifyCh: cgCache.creditNotifyCh,
creditRequestCh: cgCache.creditRequestCh,
cgCache: cgCache, // just a reference to the cgCache
redeliveryIntervalInMs: defaultRedeliveryIntervalInMs,
consumerHealth: consumerHealth{
badConns: make(map[int]int),
lastAckTime: common.Now(), // Need to start in the non-stalled state
Expand Down
28 changes: 16 additions & 12 deletions storage/manyrocks/manyrocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,27 +867,31 @@ func (t *Rock) Purge(purgeAddr s.Address) (nextAddr s.Address, nextKey s.Key, er
session := uuid.New()

t.store.logger.WithFields(bark.Fields{
`purgeAddr`: purgeAddr,
common.TagExt: t.id,
`duration`: duration,
`session`: session,
`purgeAddr`: purgeAddr,
`curPurgeAddr`: curPurgeAddr,
common.TagExt: t.id.String(),
`duration`: duration,
`session`: session,
`num-files`: len(dbg),
}).Info(`Purge took too long!`)

for _, di := range dbg {

if di.deleted {
t.store.logger.WithFields(bark.Fields{
`filename`: di.filename,
`maxAddr`: di.maxAddr,
`deleted`: di.deleted,
`duration`: di.duration,
`session`: session,
common.TagExt: t.id.String(),
`filename`: di.filename,
`maxAddr`: di.maxAddr,
`deleted`: di.deleted,
`duration`: di.duration,
`session`: session,
}).Info(`RocksDB.DeleteFile`)
} else {
t.store.logger.WithFields(bark.Fields{
`filename`: di.filename,
`deleted`: di.deleted,
`session`: session,
common.TagExt: t.id.String(),
`filename`: di.filename,
`deleted`: di.deleted,
`session`: session,
}).Info(`RocksDB.DeleteFile`)
}
}
Expand Down

0 comments on commit 4c71fb1

Please sign in to comment.