Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
limit the multi_zone config to controller only
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Apr 11, 2017
1 parent 469ac57 commit 28f30ac
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 73 deletions.
2 changes: 0 additions & 2 deletions common/constants.go
Expand Up @@ -39,8 +39,6 @@ const (
CallerHostName = "host-name"
// CallerServiceName is the name of thrift context header contains current service name
CallerServiceName = "cn"
// A fake service name used by dynamic config. It represents common configs among all services
CommonServiceName = "cherami-common"
)

// ServiceToPort is service name to ports mapping
Expand Down
28 changes: 0 additions & 28 deletions common/multizoneconfig.go

This file was deleted.

5 changes: 4 additions & 1 deletion services/controllerhost/config.go
Expand Up @@ -56,6 +56,10 @@ type (
NumPublisherExtentsByPath []string `name:"numPublisherExtentsByPath" default:"/=4"`
NumConsumerExtentsByPath []string `name:"numConsumerExtentsByPath" default:"/=8"`
NumRemoteConsumerExtentsByPath []string `name:"numRemoteConsumerExtentsByPath" default:"/=4"`

// configs for multi_zone consumer group
ActiveZone string `name:"activeZone"`
FailoverMode string `name:"failoverMode" default:"disabled"`
}
)

Expand All @@ -67,7 +71,6 @@ func newConfigManager(mClient m.TChanMetadataService, logger bark.Logger) dconfi
common.OutputServiceName: OutputPlacementConfig{},
common.StoreServiceName: StorePlacementConfig{},
common.ControllerServiceName: ControllerDynamicConfig{},
common.CommonServiceName: common.MultiZoneDynamicConfig{},
}
return dconfig.NewCassandraConfigManager(mClient, cfgTypes, logger)
}
22 changes: 18 additions & 4 deletions services/controllerhost/consumer.go
Expand Up @@ -21,6 +21,7 @@
package controllerhost

import (
"strings"
"time"

"github.com/uber-common/bark"
Expand Down Expand Up @@ -620,20 +621,20 @@ func refreshOutputHostsForConsGroup(context *Context,
}

if cgDesc.GetIsMultiZone() {
cfgObj, err := context.cfgMgr.Get(common.CommonServiceName, "*", "*", "*")
cfgObj, err := context.cfgMgr.Get(common.ControllerServiceName, "*", "*", "*")
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrMetadataReadCounter)
context.m3Client.IncCounter(m3Scope, metrics.ControllerFailures)
return nil, err
}

cfg, ok := cfgObj.(common.MultiZoneDynamicConfig)
cfg, ok := cfgObj.(ControllerDynamicConfig)
if !ok {
context.log.Fatal("Unexpected type mismatch, cfgObj.(common.MultiZoneDynamicConfig) failed !")
context.log.Fatal("Unexpected type mismatch, cfgObj.(ControllerDynamicConfig) failed !")
}

// If we shouldn't consume in this zone(for a multi_zone cg), short circuit and return
if !common.ShouldConsumeInZone(context.localZone, cgDesc, cfg) {
if !ShouldConsumeInZone(context.localZone, cgDesc, cfg) {
writeToCache(int64(outputCacheTTL))
return outputAddrs, nil
}
Expand Down Expand Up @@ -689,3 +690,16 @@ func refreshOutputHostsForConsGroup(context *Context,
writeToCache(failBackoffInterval)
return outputAddrs, err
}

// ShouldConsumeInZone indicated whether we should consume from this zone for a multi_zone consumer group
func ShouldConsumeInZone(zone string, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool {
if strings.EqualFold(dConfig.FailoverMode, `enabled`) {
return strings.EqualFold(zone, dConfig.ActiveZone)
}

if cgDesc.IsSetActiveZone() {
return strings.EqualFold(zone, cgDesc.GetActiveZone())
}

return strings.EqualFold(zone, dConfig.ActiveZone)
}
33 changes: 0 additions & 33 deletions services/outputhost/cgcache.go
Expand Up @@ -155,9 +155,6 @@ type (
// sessionID is the 16 bit session identifier for this host
sessionID uint16

// local zone name
localZone string

// ackMgrLoadCh is the channel used to notify the outputhost when an
// ackMgr is loaded
ackMgrLoadCh chan<- ackMgrLoadMsg
Expand Down Expand Up @@ -255,7 +252,6 @@ func newConsumerGroupCache(destPath string, cgDesc shared.ConsumerGroupDescripti
creditRequestCh: make(chan string, 50),
lastDisconnectTime: time.Now(),
sessionID: h.sessionID,
localZone: h.localZone,
ackIDGen: h.ackMgrIDGen,
ackMgrLoadCh: h.ackMgrLoadCh,
ackMgrUnloadCh: h.ackMgrUnloadCh,
Expand Down Expand Up @@ -543,20 +539,6 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error {
return ErrCgUnloaded
}

if cgDesc.GetIsMultiZone() {
cfg, err := cgCache.getMultiZoneDynamicConfig()
if err != nil {
cgCache.logger.WithField(common.TagErr, err).Error(`failed to get multizone dynamic config`)
return err
}

// If we shouldn't consume in this zone(for a multi_zone cg), short circuit and return
if !common.ShouldConsumeInZone(cgCache.localZone, cgDesc, cfg) {
go cgCache.unloadConsumerGroupCache()
return nil
}
}

cgCache.cachedCGDesc.Status = cgDesc.Status
cgCache.cachedCGDesc.MaxDeliveryCount = cgDesc.MaxDeliveryCount

Expand Down Expand Up @@ -633,21 +615,6 @@ func (cgCache *consumerGroupCache) getDynamicCgConfig() (OutputCgConfig, error)
return cfg, nil
}

// getMultiZoneDynamicConfig gets the configuration object for this host
func (cgCache *consumerGroupCache) getMultiZoneDynamicConfig() (common.MultiZoneDynamicConfig, error) {
dCfgIface, err := cgCache.cfgMgr.Get(common.CommonServiceName, `*`, `*`, `*`)
if err != nil {
cgCache.logger.WithFields(bark.Fields{common.TagErr: err}).Error(`Couldn't get the configuration object`)
return common.MultiZoneDynamicConfig{}, err
}
cfg, ok := dCfgIface.(common.MultiZoneDynamicConfig)
if !ok {
cgCache.logger.Error(`Couldn't cast cfg to common.MultiZoneDynamicConfig`)
return common.MultiZoneDynamicConfig{}, ErrConfigCast
}
return cfg, nil
}

// getMessageCacheSize gets the configured value for the message cache for this CG
func (cgCache *consumerGroupCache) getMessageCacheSize(cfg OutputCgConfig, oldSize int32) (cacheSize int32) {
logFn := func() bark.Logger {
Expand Down
1 change: 0 additions & 1 deletion services/outputhost/config.go
Expand Up @@ -46,7 +46,6 @@ type (
func newConfigManager(mClient m.TChanMetadataService, logger bark.Logger) dconfig.ConfigManager {
cfgTypes := map[string]interface{}{
common.OutputServiceName: OutputCgConfig{},
common.CommonServiceName: common.MultiZoneDynamicConfig{},
}
return dconfig.NewCassandraConfigManager(mClient, cfgTypes, logger)
}
4 changes: 0 additions & 4 deletions services/outputhost/outputhost.go
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -90,7 +89,6 @@ type (
ackMgrUnloadCh chan uint32
hostMetrics *load.HostMetrics
cfgMgr cassDconfig.ConfigManager
localZone string
common.SCommon
}

Expand Down Expand Up @@ -764,8 +762,6 @@ func NewOutputHost(serviceName string, sVice common.SCommon, metadataClient meta
hostMetrics: load.NewHostMetrics(),
}

bs.localZone, _ = common.GetLocalClusterInfo(strings.ToLower(deploymentName))

bs.sessionID = common.UUIDToUint16(sVice.GetHostUUID())

bs.m3Client = metrics.NewClient(sVice.GetMetricsReporter(), metrics.Outputhost)
Expand Down

0 comments on commit 28f30ac

Please sign in to comment.