Skip to content

Commit

Permalink
Add dynamic config for namespace refresh interval (#2766)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 authored and yycptt committed May 5, 2022
1 parent 8c78ad8 commit aa566f7
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 36 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const (
EnableStickyQuery = "system.enableStickyQuery"
// EnableActivityLocalDispatch indicates if acitivty local dispatch is enabled per namespace
EnableActivityLocalDispatch = "system.enableActivityLocalDispatch"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"

// key for size limit

Expand Down
30 changes: 16 additions & 14 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand All @@ -62,16 +62,10 @@ const (
cacheInitialSize = 10 * 1024
cacheMaxSize = 64 * 1024
cacheTTL = 0 // 0 means infinity
// CacheRefreshInterval namespace cache refresh interval
CacheRefreshInterval = 10 * time.Second
// CacheRefreshFailureRetryInterval is the wait time
// if refreshment encounters error
CacheRefreshFailureRetryInterval = 1 * time.Second
CacheRefreshPageSize = 200

cacheInitialized int32 = 0
cacheStarted int32 = 1
cacheStopped int32 = 2
)

const (
Expand Down Expand Up @@ -150,6 +144,7 @@ type (
metricsClient metrics.Client
logger log.Logger
lastRefreshTime atomic.Value
refreshInterval dynamicconfig.DurationPropertyFn

// cacheLock protects cachNameToID and cacheByID. If the exclusive side
// is to be held at the same time as the callbackLock (below), this lock
Expand All @@ -172,6 +167,7 @@ type (
func NewRegistry(
persistence Persistence,
enableGlobalNamespaces bool,
refreshInterval dynamicconfig.DurationPropertyFn,
metricsClient metrics.Client,
logger log.Logger,
) Registry {
Expand All @@ -186,6 +182,7 @@ func NewRegistry(
cacheByID: cache.New(cacheMaxSize, &cacheOpts),
prepareCallbacks: make(map[any]PrepareCallbackFn),
callbacks: make(map[any]CallbackFn),
refreshInterval: refreshInterval,
}
reg.lastRefreshTime.Store(time.Time{})
return reg
Expand Down Expand Up @@ -339,17 +336,22 @@ func (r *registry) Refresh() {
}

func (r *registry) refreshLoop(ctx context.Context) error {
timer := time.NewTicker(CacheRefreshInterval)
defer timer.Stop()

// Put timer events on our channel so we can select on just one below.
go func() {
for range timer.C {
timer := time.NewTicker(r.refreshInterval())

for {
select {
case r.triggerRefreshCh <- nil:
default:
case <-timer.C:
select {
case r.triggerRefreshCh <- nil:
default:
}
timer.Reset(r.refreshInterval())
case <-ctx.Done():
timer.Stop()
return
}

}
}()

Expand Down
5 changes: 3 additions & 2 deletions common/namespace/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
enumspb "go.temporal.io/api/enums/v1"
namespacepb "go.temporal.io/api/namespace/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -72,6 +72,7 @@ func (s *registrySuite) SetupTest() {
s.registry = namespace.NewRegistry(
s.regPersistence,
true,
dynamicconfig.GetDurationPropertyFn(time.Second),
metrics.NoopClient,
log.NewTestLogger())
}
Expand Down Expand Up @@ -658,7 +659,7 @@ func TestCacheByName(t *testing.T) {
Namespaces: []*persistence.GetNamespaceResponse{&nsrec},
}, nil)
reg := namespace.NewRegistry(
regPersist, false, metrics.NoopClient, log.NewNoopLogger())
regPersist, false, dynamicconfig.GetDurationPropertyFn(time.Second), metrics.NoopClient, log.NewNoopLogger())
reg.Start()
defer reg.Stop()
ns, err := reg.GetNamespace(namespace.Name("foo"))
Expand Down
2 changes: 2 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,12 @@ func NamespaceRegistryProvider(
metricsClient metrics.Client,
clusterMetadata cluster.Metadata,
metadataManager persistence.MetadataManager,
dynamicCollection *dynamicconfig.Collection,
) namespace.Registry {
return namespace.NewRegistry(
metadataManager,
clusterMetadata.IsGlobalNamespaceEnabled(),
dynamicCollection.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),
metricsClient,
logger,
)
Expand Down
3 changes: 3 additions & 0 deletions host/dynamicconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"go.temporal.io/server/common/persistence/visibility"
)

const NamespaceCacheRefreshInterval = time.Second

var (
// Override values for dynamic configs
staticOverrides = map[dynamicconfig.Key]interface{}{
Expand All @@ -47,6 +49,7 @@ var (
dynamicconfig.ReplicationTaskFetcherErrorRetryWait: 50 * time.Millisecond,
dynamicconfig.ReplicationTaskProcessorErrorRetryWait: time.Millisecond,
dynamicconfig.ClusterMetadataRefreshInterval: 100 * time.Millisecond,
dynamicconfig.NamespaceCacheRefreshInterval: NamespaceCacheRefreshInterval,
}
)

Expand Down
5 changes: 3 additions & 2 deletions host/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
Expand Down Expand Up @@ -116,7 +115,9 @@ func (s *IntegrationBase) setupSuite(defaultClusterConfigFile string) {
s.testCluster.RefreshNamespaceCache()
} else {
// Wait for one whole cycle of the namespace cache v2 refresh interval to be sure that our namespaces are loaded.
time.Sleep(namespace.CacheRefreshInterval + time.Second)
// We are using real server so we don't know what cache refresh interval it uses. Fall back to the 10s old value.
serverCacheRefreshInterval := 10 * time.Second
time.Sleep(serverCacheRefreshInterval + time.Second)
}
}

Expand Down
3 changes: 1 addition & 2 deletions host/ndc/ndc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
test "go.temporal.io/server/common/testing"
"go.temporal.io/server/environment"
"go.temporal.io/server/host"
Expand Down Expand Up @@ -1748,7 +1747,7 @@ func (s *nDCIntegrationTestSuite) registerNamespace() {
})
s.Require().NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(2 * namespace.CacheRefreshInterval)
time.Sleep(2 * host.NamespaceCacheRefreshInterval)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: s.namespace,
Expand Down
15 changes: 6 additions & 9 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import (
"testing"
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/server/api/adminservice/v1"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -57,23 +54,23 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/activity"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
sw "go.temporal.io/server/service/worker"
"gopkg.in/yaml.v3"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/failure"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/environment"
"go.temporal.io/server/host"
sw "go.temporal.io/server/service/worker"
"go.temporal.io/server/service/worker/migration"
"gopkg.in/yaml.v3"
)

type (
Expand All @@ -89,7 +86,7 @@ type (
)

const (
cacheRefreshInterval = namespace.CacheRefreshInterval + 5*time.Second
cacheRefreshInterval = host.NamespaceCacheRefreshInterval + 5*time.Second
)

var (
Expand Down Expand Up @@ -688,7 +685,7 @@ func (s *integrationClustersTestSuite) TestStartWorkflowExecution_Failover_Workf
_, err := client1.RegisterNamespace(host.NewContext(), regReq)
s.NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(namespace.CacheRefreshInterval)
time.Sleep(cacheRefreshInterval)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespaceName,
Expand Down
10 changes: 6 additions & 4 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ type Config struct {
ESProcessorFlushInterval dynamicconfig.DurationPropertyFn
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn

EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityLocalDispatch dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityLocalDispatch dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
}

const (
Expand Down Expand Up @@ -436,8 +437,9 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ESProcessorFlushInterval: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorFlushInterval, 1*time.Second),
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 1*time.Minute),

EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityLocalDispatch: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityLocalDispatch, false),
EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityLocalDispatch: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityLocalDispatch, false),
NamespaceCacheRefreshInterval: dc.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),
}

return cfg
Expand Down
2 changes: 1 addition & 1 deletion service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (t *taskProcessor) handleTaskError(
// TODO remove this error check special case
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*serviceerror.NamespaceNotActive); ok {
if t.timeSource.Now().Sub(task.startTime) > 2*namespace.CacheRefreshInterval {
if t.timeSource.Now().Sub(task.startTime) > 2*t.shard.GetConfig().NamespaceCacheRefreshInterval() {
scope.IncCounter(metrics.TaskNotActiveCounter)
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/taskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -236,7 +235,7 @@ func (s *taskProcessorSuite) TestHandleTaskError_NamespaceNotActiveError() {
err := serviceerror.NewNamespaceNotActive("", "", "")

taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo.startTime = time.Now().UTC().Add(-namespace.CacheRefreshInterval * time.Duration(3))
taskInfo.startTime = time.Now().UTC().Add(-s.taskProcessor.shard.GetConfig().NamespaceCacheRefreshInterval() * time.Duration(3))
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))

taskInfo.startTime = time.Now().UTC()
Expand Down
1 change: 1 addition & 0 deletions service/history/tests/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,6 @@ func NewDynamicConfig() *configs.Config {
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.HistoryLongPollExpirationInterval, 10*time.Second)
config.EnableActivityLocalDispatch = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true)
config.NamespaceCacheRefreshInterval = dynamicconfig.GetDurationPropertyFn(time.Second)
return config
}

0 comments on commit aa566f7

Please sign in to comment.