Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Move singleCGVisibility cache to LRU; move to atomic operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Bailey committed Jan 27, 2017
1 parent 418a84b commit 24a73f9
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 38 deletions.
1 change: 0 additions & 1 deletion common/cache/lru.go
Expand Up @@ -63,7 +63,6 @@ func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})

}

// Get retrieves the value stored under the given key
Expand Down
71 changes: 38 additions & 33 deletions services/outputhost/cgcache.go
Expand Up @@ -21,13 +21,15 @@
package outputhost

import (
"sync/atomic"
"sync"
"time"

"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/cache"
"github.com/uber/cherami-server/common/dconfig"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
Expand Down Expand Up @@ -147,8 +149,11 @@ type (
// lastDisconnectTime is the time the last consumer got disconnected
lastDisconnectTime time.Time

// dlqMerging indicates whether any DLQ extent is presently being merged
dlqMerging bool
// dlqMerging indicates whether any DLQ extent is presently being merged. ATOMIC OPERATIONS ONLY
dlqMerging int32

// checkSingleCGVisibleCache reduces ListExtentStats calls
checkSingleCGVisibleCache cache.Cache // Key is the extent UUID, value is whether it is single-CG-visible

// sessionID is the 16 bit session identifier for this host
sessionID uint16
Expand Down Expand Up @@ -259,6 +264,7 @@ func newConsumerGroupCache(destPath string, cgDesc shared.ConsumerGroupDescripti
cfgMgr: h.cfgMgr,
}

cgCache.checkSingleCGVisibleCache = cache.NewLRUWithInitialCapacity(1, 1 << 6)
cgCache.consumerM3Client = metrics.NewClientWithTags(h.m3Client, metrics.Outputhost, cgCache.getConsumerGroupTags())
cgCache.loadReporter = cgCache.loadReporterFactory.CreateReporter(consGroupLoadReportingInterval, cgCache, cgLogger)
cgCache.loadReporter.Start()
Expand Down Expand Up @@ -516,12 +522,13 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error {
}
}

newDLQMerging := nSingleCGExtents > 0
if cgCache.dlqMerging != newDLQMerging {
cgCache.dlqMerging = newDLQMerging
if newDLQMerging {
// DEVNOTE: The term 'merging' here is a reference to the DLQ Merge feature/operation. These extents are
// 'served' just like any other, but they are visible to this one consumer group, hence 'SingleCG'
newDLQMerging := int32(nSingleCGExtents)
oldDLQMerging := atomic.SwapInt32(&cgCache.dlqMerging, newDLQMerging)
if oldDLQMerging != newDLQMerging {
if newDLQMerging > 0 {
cgCache.logger.WithField(`nSingleCGExtents`, nSingleCGExtents).Info("Merging DLQ Extent(s)")

} else {
cgCache.logger.Info("Done merging DLQ Extent(s)")
}
Expand All @@ -530,39 +537,37 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error {
return nil
}

var checkSingleCGVisibleCacheLock sync.RWMutex
var checkSingleCGVisibleCache = make(map[string]bool)

// checkSingleCGVisible determines if an extent under a certain destination is visible only to one consumer group. It is a cached call, so the
// metadata client will only be invoked once per destination+extent
func (cgCache *consumerGroupCache) checkSingleCGVisible(ctx thrift.Context, cge *metadata.ConsumerGroupExtent) bool {
// Key must have destination, since this extent can be loaded as either the DLQ destination or the merged destination
key := cgCache.cachedCGDesc.GetDestinationUUID() + cge.GetExtentUUID()
var ok, val bool
readFn := func() bool {
val, ok = checkSingleCGVisibleCache[key]
return !ok
func (cgCache *consumerGroupCache) checkSingleCGVisible(ctx thrift.Context, cge *metadata.ConsumerGroupExtent) (singleCgVisible bool) {
// Note that this same extent can be loaded by either a consumer group of the DLQ destination (i.e. for inspection,
// with consumer group visibility = nil), or as an extent being merged into the original consumer group (i.e. for DLQ
// merge, with consumer group visibility = this CG). This is why this cache isn't global
key := cge.GetExtentUUID()
val := cgCache.checkSingleCGVisibleCache.Get(key)
if val != nil {
return val.(bool)
}
writeFn := func() {
req := &metadata.ReadExtentStatsRequest{
DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()),
ExtentUUID: common.StringPtr(cge.GetExtentUUID()),
}
ext, err := cgCache.metaClient.ReadExtentStats(ctx, req)
if err == nil {
if ext != nil && ext.GetExtentStats() != nil {
val = ext.GetExtentStats().GetConsumerGroupVisibility() != ``
if val {
cgCache.logger.WithFields(bark.Fields{common.TagExt: common.FmtExt(cge.GetExtentUUID())}).Info("Consuming single CG visible extent")
}
return

req := &metadata.ReadExtentStatsRequest{
DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()),
ExtentUUID: common.StringPtr(cge.GetExtentUUID()),
}
ext, err := cgCache.metaClient.ReadExtentStats(ctx, req)
if err == nil {
if ext != nil && ext.GetExtentStats() != nil {
singleCgVisible = ext.GetExtentStats().GetConsumerGroupVisibility() != ``
if singleCgVisible {
cgCache.logger.WithFields(bark.Fields{common.TagExt: common.FmtExt(cge.GetExtentUUID())}).Info("Consuming single CG visible extent")
}
} else {
cgCache.logger.WithFields(bark.Fields{common.TagErr: err, common.TagExt: common.FmtExt(cge.GetExtentUUID())}).Error("ReadExtentStats failed on refresh")
}
cgCache.checkSingleCGVisibleCache.Put(key, singleCgVisible)
} else { // Note that we don't cache val when we have an error
cgCache.logger.WithFields(bark.Fields{common.TagErr: err, common.TagExt: common.FmtExt(cge.GetExtentUUID())}).Error("ReadExtentStats failed on refresh")
}
common.RWLockReadAndConditionalWrite(&checkSingleCGVisibleCacheLock, readFn, writeFn)
return val

return
}

// getDynamicCgConfig gets the configuration object for this host
Expand Down
7 changes: 3 additions & 4 deletions services/outputhost/messagecache.go
Expand Up @@ -21,6 +21,7 @@
package outputhost

import (
"sync/atomic"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -1074,12 +1075,10 @@ func (msgCache *cgMsgCache) isStalled() bool {
var m3St m3HealthState
var smartRetryDisabled bool

msgCache.cgCache.extMutex.RLock()
if msgCache.cgCache.dlqMerging || strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 || strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
smartRetryDisabled = true
}
msgCache.cgCache.extMutex.RUnlock()


now := common.Now()

// Determines that no progress has been made in the recent past; this is half the lock timeout to be
Expand Down

0 comments on commit 24a73f9

Please sign in to comment.