Skip to content

Commit

Permalink
Merge branch 'CDNC_4431' into CDNC_4589
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenxia committed Jun 8, 2023
2 parents e7a1b98 + 1e1288e commit c0f4920
Show file tree
Hide file tree
Showing 30 changed files with 1,111 additions and 599 deletions.
72 changes: 68 additions & 4 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

299 changes: 150 additions & 149 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

299 changes: 150 additions & 149 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

299 changes: 150 additions & 149 deletions .gen/proto/shared/v1/history.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/dynamicconfig/constants.go
Expand Up @@ -3231,7 +3231,7 @@ var IntKeys = map[IntKey]DynamicInt{
ReplicatorCacheCapacity: DynamicInt{
KeyName: "history.replicatorCacheCapacity",
Description: "ReplicatorCacheCapacity is the capacity of replication cache in number of tasks",
DefaultValue: 10000,
DefaultValue: 0,
},
ExecutionMgrNumConns: DynamicInt{
KeyName: "history.executionMgrNumConns",
Expand Down
2 changes: 1 addition & 1 deletion common/elasticsearch/client.go
Expand Up @@ -340,7 +340,7 @@ func (c *ESClient) getSearchResult(
)

if ShouldSearchAfter(token) {
qb.SearchAfter([]interface{}{token.SortValue, token.TieBreaker})
qb.SearchAfter(token.SortValue, token.TieBreaker)
}

body, err := qb.String()
Expand Down
4 changes: 2 additions & 2 deletions common/elasticsearch/query/builder_test.go
Expand Up @@ -57,15 +57,15 @@ func TestBuilderAgainsESv7(t *testing.T) {
qb.Query(NewExistsQuery("user"))
qb.Size(10)
qb.Sortby(NewFieldSort("runid").Desc())
qb.Query(NewBoolQuery().Must(NewMatchQuery("domainID", "uuid"))).SearchAfter([]interface{}{"sortval", "tiebraker"})
qb.Query(NewBoolQuery().Must(NewMatchQuery("domainID", "uuid"))).SearchAfter("sortval", "tiebraker")
qbs, err := qb.Source()
assert.NoError(t, err)

searchSource := elastic.NewSearchSource().
Query(elastic.NewExistsQuery("user")).
Size(10).
SortBy(elastic.NewFieldSort("runid").Desc()).
Query(elastic.NewBoolQuery().Must(elastic.NewMatchQuery("domainID", "uuid"))).SearchAfter([]interface{}{"sortval", "tiebraker"})
Query(elastic.NewBoolQuery().Must(elastic.NewMatchQuery("domainID", "uuid"))).SearchAfter("sortval", "tiebraker")

sss, err := searchSource.Source()
assert.NoError(t, err)
Expand Down
49 changes: 43 additions & 6 deletions common/isolationgroup/defaultisolationgroupstate/state.go
Expand Up @@ -27,6 +27,8 @@ import (
"fmt"
"sync/atomic"

"github.com/uber/cadence/common/metrics"

"github.com/uber/cadence/common/isolationgroup/isolationgroupapi"

"github.com/uber/cadence/common"
Expand All @@ -44,6 +46,7 @@ type defaultIsolationGroupStateHandler struct {
domainCache cache.DomainCache
globalIsolationGroupDrains dynamicconfig.Client
config defaultConfig
metricsClient metrics.Client
}

// NewDefaultIsolationGroupStateWatcherWithConfigStoreClient Is a constructor which allows passing in the dynamic config client
Expand All @@ -52,6 +55,7 @@ func NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(
dc *dynamicconfig.Collection,
domainCache cache.DomainCache,
cfgStoreClient dynamicconfig.Client, // can be nil, which means global drain is unsupported
metricsClient metrics.Client,
) (isolationgroup.State, error) {
stopChan := make(chan struct{})

Expand All @@ -73,16 +77,18 @@ func NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(
status: common.DaemonStatusInitialized,
log: logger,
config: config,
metricsClient: metricsClient,
}, nil
}

func (z *defaultIsolationGroupStateHandler) AvailableIsolationGroupsByDomainID(ctx context.Context, domainID string, availableIsolationGroups []string) (types.IsolationGroupConfiguration, error) {
func (z *defaultIsolationGroupStateHandler) AvailableIsolationGroupsByDomainID(ctx context.Context, domainID string, availablePollerIsolationGroups []string) (types.IsolationGroupConfiguration, error) {
state, err := z.getByDomainID(ctx, domainID)
if err != nil {
return nil, fmt.Errorf("unable to get isolation group state: %w", err)
}
isolationGroups := common.IntersectionStringSlice(z.config.AllIsolationGroups, availableIsolationGroups)
return availableIG(isolationGroups, state.Global, state.Domain), nil
availableIsolationGroupsCfg := isolationGroupHealthyListToConfig(availablePollerIsolationGroups)
scope := z.createAvailableisolationGroupMetricsScope(domainID)
return availableIG(z.config.AllIsolationGroups, availableIsolationGroupsCfg, state.Global, state.Domain, scope), nil
}

func (z *defaultIsolationGroupStateHandler) IsDrained(ctx context.Context, domain string, isolationGroup string) (bool, error) {
Expand Down Expand Up @@ -130,11 +136,11 @@ func (z *defaultIsolationGroupStateHandler) getByDomainID(ctx context.Context, d
// will return nil, nil when it is not enabled
func (z *defaultIsolationGroupStateHandler) get(ctx context.Context, domain string) (*isolationGroups, error) {
if !z.config.IsolationGroupEnabled(domain) {
return nil, nil
return &isolationGroups{}, nil
}

domainData, err := z.domainCache.GetDomain(domain)
if err != nil {
if err != nil || domainData == nil {
return nil, fmt.Errorf("could not resolve domain in isolationGroup handler: %w", err)
}

Expand Down Expand Up @@ -163,22 +169,42 @@ func (z *defaultIsolationGroupStateHandler) get(ctx context.Context, domain stri
return ig, nil
}

func (z *defaultIsolationGroupStateHandler) createAvailableisolationGroupMetricsScope(domainID string) metrics.Scope {
domainName, _ := z.domainCache.GetDomainName(domainID)
return z.metricsClient.Scope(metrics.GetAvailableIsolationGroupsScope).Tagged(metrics.DomainTag(domainName))
}

// A simple explicit deny-based isolation group implementation
func availableIG(allIsolationGroups []string, global types.IsolationGroupConfiguration, domain types.IsolationGroupConfiguration) types.IsolationGroupConfiguration {
func availableIG(
allIsolationGroups []string,
availablePollers types.IsolationGroupConfiguration,
global types.IsolationGroupConfiguration,
domain types.IsolationGroupConfiguration,
scope metrics.Scope,
) types.IsolationGroupConfiguration {
out := types.IsolationGroupConfiguration{}
for _, isolationGroup := range allIsolationGroups {
_, hasAvailablePollers := availablePollers[isolationGroup]
globalCfg, hasGlobalConfig := global[isolationGroup]
domainCfg, hasDomainConfig := domain[isolationGroup]
if hasGlobalConfig {
if globalCfg.State == types.IsolationGroupStateDrained {
scope.Tagged(metrics.PollerIsolationGroupTag(isolationGroup)).IncCounter(metrics.IsolationGroupStateDrained)
continue
}
}
if hasDomainConfig {
if domainCfg.State == types.IsolationGroupStateDrained {
scope.Tagged(metrics.PollerIsolationGroupTag(isolationGroup)).IncCounter(metrics.IsolationGroupStateDrained)
continue
}
}
if !hasAvailablePollers {
// we don't attempt to dispatch tasks to isolation groups where there are no pollers
scope.Tagged(metrics.PollerIsolationGroupTag(isolationGroup)).IncCounter(metrics.IsolationGroupStatePollerUnavailable)
continue
}
scope.Tagged(metrics.PollerIsolationGroupTag(isolationGroup)).IncCounter(metrics.IsolationGroupStateHealthy)
out[isolationGroup] = types.IsolationGroupPartition{
Name: isolationGroup,
State: types.IsolationGroupStateHealthy,
Expand All @@ -202,3 +228,14 @@ func isDrained(isolationGroup string, global types.IsolationGroupConfiguration,
}
return false
}

func isolationGroupHealthyListToConfig(igs []string) types.IsolationGroupConfiguration {
out := make(types.IsolationGroupConfiguration, len(igs))
for _, ig := range igs {
out[ig] = types.IsolationGroupPartition{
Name: ig,
State: types.IsolationGroupStateHealthy,
}
}
return out
}

0 comments on commit c0f4920

Please sign in to comment.