Skip to content

Commit

Permalink
Merge branch 'master' into snowden/replication-tasks-2
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jun 22, 2023
2 parents a63f3f2 + cca978f commit 3e6bd9e
Show file tree
Hide file tree
Showing 87 changed files with 2,489 additions and 1,436 deletions.
339 changes: 204 additions & 135 deletions api/cli/v1/message.pb.go

Large diffs are not rendered by default.

257 changes: 128 additions & 129 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

133 changes: 87 additions & 46 deletions api/token/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions common/dynamicconfig/collection.go
Expand Up @@ -85,6 +85,7 @@ type (
MapPropertyFnWithNamespaceFilter func(namespace string) map[string]any
StringPropertyFn func() string
StringPropertyFnWithNamespaceFilter func(namespace string) string
StringPropertyFnWithNamespaceIDFilter func(namespaceID string) string
)

const (
Expand Down Expand Up @@ -347,6 +348,19 @@ func (c *Collection) GetStringPropertyFnWithNamespaceFilter(key Key, defaultValu
}
}

// GetStringPropertyFnWithNamespaceIDFilter gets property with namespace ID filter and asserts that it's a string
func (c *Collection) GetStringPropertyFnWithNamespaceIDFilter(key Key, defaultValue any) StringPropertyFnWithNamespaceIDFilter {
return func(namespaceID string) string {
return matchAndConvert(
c,
key,
defaultValue,
namespaceIDPrecedence(namespaceID),
convertString,
)
}
}

// GetMapPropertyFnWithNamespaceFilter gets property and asserts that it's a map
func (c *Collection) GetMapPropertyFnWithNamespaceFilter(key Key, defaultValue any) MapPropertyFnWithNamespaceFilter {
return func(namespace string) map[string]interface{} {
Expand Down
9 changes: 9 additions & 0 deletions common/dynamicconfig/collection_test.go
Expand Up @@ -53,6 +53,7 @@ const (
testGetDurationPropertyStructuredDefaults = "testGetDurationPropertyStructuredDefaults"
testGetBoolPropertyFilteredByNamespaceIDKey = "testGetBoolPropertyFilteredByNamespaceIDKey"
testGetBoolPropertyFilteredByTaskQueueInfoKey = "testGetBoolPropertyFilteredByTaskQueueInfoKey"
testGetStringPropertyFilteredByNamespaceIDKey = "testGetStringPropertyFilteredByNamespaceIDKey"
)

// Note: fileBasedClientSuite also heavily tests Collection, since some tests are easier with data
Expand Down Expand Up @@ -97,6 +98,14 @@ func (s *collectionSuite) TestGetStringPropertyFnWithNamespaceFilter() {
s.Equal("efg", value(namespace))
}

func (s *collectionSuite) TestGetStringPropertyFnWithNamespaceIDFilter() {
namespaceID := "testNamespaceID"
value := s.cln.GetStringPropertyFnWithNamespaceIDFilter(testGetStringPropertyFilteredByNamespaceIDKey, "abc")
s.Equal("abc", value(namespaceID))
s.client[testGetStringPropertyFilteredByNamespaceIDKey] = "efg"
s.Equal("efg", value(namespaceID))
}

func (s *collectionSuite) TestGetIntPropertyFilteredByTaskQueueInfo() {
namespace := "testNamespace"
taskQueue := "testTaskQueue"
Expand Down
13 changes: 7 additions & 6 deletions common/dynamicconfig/constants.go
Expand Up @@ -197,6 +197,9 @@ const (
ReachabilityQueryBuildIdLimit = "limit.reachabilityQueryBuildIds"
// TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id.
TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId"
// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing
// set for it to be considered for removal, used by the build id scavenger.
RemovableBuildIdDurationSinceDefault = "worker.removableBuildIdDurationSinceDefault"

// keys for frontend

Expand All @@ -218,7 +221,7 @@ const (
// FrontendRPS is workflow rate limit per second
FrontendRPS = "frontend.rps"
// FrontendNamespaceReplicationInducingAPIsRPS limits the per second request rate for namespace replication inducing
// APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// APIs (e.g. RegisterNamespace, UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendNamespaceReplicationInducingAPIsRPS = "frontend.rps.namespaceReplicationInducingAPIs"
// FrontendMaxNamespaceRPSPerInstance is workflow namespace rate limit per second
Expand All @@ -231,14 +234,14 @@ const (
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityRPSPerInstance = "frontend.namespaceRPS.visibility"
// FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance is a per host/per namespace RPS limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// namespace replication inducing APIs (e.g. RegisterNamespace, UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance = "frontend.namespaceRPS.namespaceReplicationInducingAPIs"
// FrontendMaxNamespaceVisibilityBurstPerInstance is namespace burst limit for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityBurstPerInstance = "frontend.namespaceBurst.visibility"
// FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance is a per host/per namespace burst limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// namespace replication inducing APIs (e.g. RegisterNamespace, UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance = "frontend.namespaceBurst.namespaceReplicationInducingAPIs"
// FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster.
Expand All @@ -254,7 +257,7 @@ const (
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendGlobalNamespaceVisibilityRPS = "frontend.globalNamespaceRPS.visibility"
// FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS is a cluster global, per namespace RPS limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// namespace replication inducing APIs (e.g. RegisterNamespace, UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// The limit is evenly distributed among available frontend service instances.
// If this is set, it overwrites the per instance limit configured with
// "frontend.namespaceRPS.namespaceReplicationInducingAPIs".
Expand Down Expand Up @@ -695,8 +698,6 @@ const (
DefaultWorkflowRetryPolicy = "history.defaultWorkflowRetryPolicy"
// HistoryMaxAutoResetPoints is the key for max number of auto reset points stored in mutableState
HistoryMaxAutoResetPoints = "history.historyMaxAutoResetPoints"
// HistoryMaxTrackedBuildIds indicates the max number of build IDs to store in the BuildIds search attribute
HistoryMaxTrackedBuildIds = "history.maxTrackedBuildIds"
// EnableParentClosePolicy whether to ParentClosePolicy
EnableParentClosePolicy = "history.enableParentClosePolicy"
// ParentClosePolicyThreshold decides that parent close policy will be processed by sys workers(if enabled) if
Expand Down
47 changes: 6 additions & 41 deletions common/membership/hostinfo_provider.go
Expand Up @@ -24,53 +24,18 @@

package membership

import (
"context"

"go.uber.org/fx"
)

var HostInfoProviderModule = fx.Options(
fx.Provide(newHostInfoProvider),
fx.Invoke(hostInfoProviderLifetimeHooks),
)

type (
cachingHostInfoProvider struct {
hostInfo HostInfo
membershipMonitor Monitor
hostInfoProvider struct {
hostInfo HostInfo
}
)

func newHostInfoProvider(membershipMonitor Monitor) HostInfoProvider {
return &cachingHostInfoProvider{
membershipMonitor: membershipMonitor,
func NewHostInfoProvider(hostInfo HostInfo) *hostInfoProvider {
return &hostInfoProvider{
hostInfo: hostInfo,
}
}

func (hip *cachingHostInfoProvider) Start() error {
var err error
hip.hostInfo, err = hip.membershipMonitor.WhoAmI()
if err != nil {
return err
}
return nil
}

func (hip *cachingHostInfoProvider) HostInfo() HostInfo {
func (hip *hostInfoProvider) HostInfo() HostInfo {
return hip.hostInfo
}

func hostInfoProviderLifetimeHooks(
lc fx.Lifecycle,
provider HostInfoProvider,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
return provider.Start()
},
},
)

}

0 comments on commit 3e6bd9e

Please sign in to comment.