From 61cebe2d8e6db9b4d5044dacc9e20778a890cd10 Mon Sep 17 00:00:00 2001 From: Alfred Landrum Date: Fri, 14 Jul 2023 16:47:46 -0700 Subject: [PATCH] add configurable ownership caching to history client --- client/clientfactory.go | 1 + client/history/caching_redirector.go | 213 +++++++++++++ client/history/caching_redirector_test.go | 359 ++++++++++++++++++++++ client/history/client.go | 14 +- client/history/connections.go | 5 + client/history/connections_mock.go | 12 + common/dynamicconfig/constants.go | 3 + service/history/configs/config.go | 11 +- 8 files changed, 614 insertions(+), 4 deletions(-) create mode 100644 client/history/caching_redirector.go create mode 100644 client/history/caching_redirector_test.go diff --git a/client/clientfactory.go b/client/clientfactory.go index 9eb2934f1a5..69650a5b74c 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -126,6 +126,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( } client := history.NewClient( + cf.dynConfig, resolver, cf.logger, cf.numberOfHistoryShards, diff --git a/client/history/caching_redirector.go b/client/history/caching_redirector.go new file mode 100644 index 00000000000..2dc90e69438 --- /dev/null +++ b/client/history/caching_redirector.go @@ -0,0 +1,213 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "errors" + "sync" + + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/membership" + serviceerrors "go.temporal.io/server/common/serviceerror" +) + +type ( + cacheEntry struct { + shardID int32 + address rpcAddress + connection clientConnection + } + + // A cachingRedirector is a redirector that maintains a cache of shard + // owners, and uses that cache instead of querying membership for each + // operation. Cache entries are evicted either for shard ownership lost + // errors, or for any error that might indicate the history instance + // is no longer available, including timeouts. + cachingRedirector struct { + mu struct { + sync.RWMutex + cache map[int32]cacheEntry + } + + connections connectionPool + historyServiceResolver membership.ServiceResolver + logger log.Logger + } +) + +func newCachingRedirector( + connections connectionPool, + historyServiceResolver membership.ServiceResolver, + logger log.Logger, +) *cachingRedirector { + r := &cachingRedirector{ + connections: connections, + historyServiceResolver: historyServiceResolver, + logger: logger, + } + r.mu.cache = make(map[int32]cacheEntry) + return r +} + +func (r *cachingRedirector) clientForShardID(shardID int32) (historyservice.HistoryServiceClient, error) { + if err := checkShardID(shardID); err != nil { + return nil, err + } + entry, err := r.getOrCreateEntry(shardID) + if err != nil { + return nil, err + } + return entry.connection.historyClient, nil +} + +func (r *cachingRedirector) execute(ctx context.Context, shardID int32, op clientOperation) error { + if err := checkShardID(shardID); err != nil { + return err + } + opEntry, err := r.getOrCreateEntry(shardID) + if err != nil { + return err + } + return r.redirectLoop(ctx, opEntry, op) +} + +func (r *cachingRedirector) redirectLoop(ctx context.Context, opEntry cacheEntry, op clientOperation) error { + for { + if err := common.IsValidContext(ctx); err != nil { + return err + } + opErr := op(ctx, opEntry.connection.historyClient) + if opErr == nil { + return opErr + } + if maybeHostDownError(opErr) { + r.cacheDeleteByAddress(opEntry.address) + return opErr + } + var solErr *serviceerrors.ShardOwnershipLost + if !errors.As(opErr, &solErr) { + return opErr + } + var again bool + opEntry, again = r.handleSolError(opEntry, solErr) + if !again { + return opErr + } + } +} + +func (r *cachingRedirector) getOrCreateEntry(shardID int32) (cacheEntry, error) { + r.mu.RLock() + entry, ok := r.mu.cache[shardID] + r.mu.RUnlock() + if ok { + return entry, nil + } + + r.mu.Lock() + defer r.mu.Unlock() + + entry, ok = r.mu.cache[shardID] + if ok { + return entry, nil + } + + address, err := shardLookup(r.historyServiceResolver, shardID) + if err != nil { + return cacheEntry{}, err + } + + return r.cacheAddLocked(shardID, address), nil +} + +func (r *cachingRedirector) cacheAddLocked(shardID int32, addr rpcAddress) cacheEntry { + // New history instances might reuse the address of a previously live history + // instance. Since we don't currently close GRPC connections when they become + // unused or idle, we might have a GRPC connection that has gone into its + // connection backoff state, due to the previous history instance becoming + // unreachable. A request on the GRPC connection, intended for the new history + // instance, would be delayed waiting for the next connection attempt, which + // could be many seconds. + // If we're adding a new cache entry for a shard, we take that as a hint that + // the next request should attempt to connect immediately if required. + connection := r.connections.getOrCreateClientConn(addr) + r.connections.resetConnectBackoff(connection) + + entry := cacheEntry{ + shardID: shardID, + address: addr, + connection: connection, + } + r.mu.cache[shardID] = entry + + return entry +} + +func (r *cachingRedirector) cacheDeleteByAddress(address rpcAddress) { + r.mu.Lock() + defer r.mu.Unlock() + + for shardID, entry := range r.mu.cache { + if entry.address == address { + delete(r.mu.cache, shardID) + } + } +} + +func (r *cachingRedirector) handleSolError(opEntry cacheEntry, solErr *serviceerrors.ShardOwnershipLost) (cacheEntry, bool) { + r.mu.Lock() + defer r.mu.Unlock() + + if cached, ok := r.mu.cache[opEntry.shardID]; ok { + if cached.address == opEntry.address { + delete(r.mu.cache, cached.shardID) + } + } + + solErrNewOwner := rpcAddress(solErr.OwnerHost) + if len(solErrNewOwner) != 0 && solErrNewOwner != opEntry.address { + r.logger.Info("historyClient: updating cache from shard ownership lost error", + tag.ShardID(opEntry.shardID), + tag.NewAnyTag("oldAddress", opEntry.address), + tag.NewAnyTag("newAddress", solErrNewOwner)) + return r.cacheAddLocked(opEntry.shardID, solErrNewOwner), true + } + + return cacheEntry{}, false +} + +func maybeHostDownError(opErr error) bool { + var unavail *serviceerror.Unavailable + if errors.As(opErr, &unavail) { + return true + } + return common.IsContextDeadlineExceededErr(opErr) +} diff --git a/client/history/caching_redirector_test.go b/client/history/caching_redirector_test.go new file mode 100644 index 00000000000..0fdc472874a --- /dev/null +++ b/client/history/caching_redirector_test.go @@ -0,0 +1,359 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/historyservicemock/v1" + "go.temporal.io/server/common/convert" + "go.temporal.io/server/common/log" + serviceerrors "go.temporal.io/server/common/serviceerror" + + "go.temporal.io/server/common/membership" +) + +type ( + cachingRedirectorSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + connections *MockconnectionPool + logger log.Logger + resolver *membership.MockServiceResolver + } +) + +func TestCachingRedirectorSuite(t *testing.T) { + s := new(cachingRedirectorSuite) + suite.Run(t, s) +} + +func (s *cachingRedirectorSuite) SetupTest() { + s.Assertions = require.New(s.T()) + s.controller = gomock.NewController(s.T()) + + s.connections = NewMockconnectionPool(s.controller) + s.logger = log.NewTestLogger() + s.resolver = membership.NewMockServiceResolver(s.controller) +} + +func (s *cachingRedirectorSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *cachingRedirectorSuite) TestShardCheck() { + r := newCachingRedirector(s.connections, s.resolver, s.logger) + + invalErr := &serviceerror.InvalidArgument{} + err := r.execute( + context.Background(), + -1, + func(_ context.Context, _ historyservice.HistoryServiceClient) error { + panic("notreached") + }) + s.ErrorAs(err, &invalErr) + + _, err = r.clientForShardID(-1) + s.ErrorAs(err, &invalErr) +} + +func cacheRetainingTest(s *cachingRedirectorSuite, opErr error, verify func(error)) { + testAddr := rpcAddress("testaddr") + shardID := int32(1) + + s.resolver.EXPECT(). + Lookup(convert.Int32ToString(shardID)). + Return(membership.NewHostInfoFromAddress(string(testAddr)), nil). + Times(1) + + mockClient := historyservicemock.NewMockHistoryServiceClient(s.controller) + clientConn := clientConnection{ + historyClient: mockClient, + } + s.connections.EXPECT(). + getOrCreateClientConn(testAddr). + Return(clientConn) + s.connections.EXPECT(). + resetConnectBackoff(clientConn) + + clientOp := func(ctx context.Context, client historyservice.HistoryServiceClient) error { + if client != mockClient { + return errors.New("wrong client") + } + return opErr + } + r := newCachingRedirector(s.connections, s.resolver, s.logger) + + for i := 0; i < 3; i++ { + err := r.execute( + context.Background(), + shardID, + clientOp, + ) + verify(err) + } +} + +func (s *cachingRedirectorSuite) TestExecuteShardSuccess() { + cacheRetainingTest(s, nil, func(err error) { + s.NoError(err) + }) +} + +func (s *cachingRedirectorSuite) TestExecuteCacheRetainingError() { + notFound := serviceerror.NewNotFound("notfound") + cacheRetainingTest(s, notFound, func(err error) { + s.Error(err) + s.Equal(notFound, err) + }) +} + +func hostDownErrorTest(s *cachingRedirectorSuite, clientOp clientOperation, verify func(err error)) { + testAddr := rpcAddress("testaddr") + shardID := int32(1) + + s.resolver.EXPECT(). + Lookup(convert.Int32ToString(shardID)). + Return(membership.NewHostInfoFromAddress(string(testAddr)), nil). + Times(1) + + mockClient := historyservicemock.NewMockHistoryServiceClient(s.controller) + clientConn := clientConnection{ + historyClient: mockClient, + } + s.connections.EXPECT(). + getOrCreateClientConn(testAddr). + Return(clientConn). + Times(1) + s.connections.EXPECT(). + resetConnectBackoff(clientConn). + Times(1) + + r := newCachingRedirector(s.connections, s.resolver, s.logger) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + err := r.execute( + ctx, + shardID, + clientOp, + ) + verify(err) +} + +func (s *cachingRedirectorSuite) TestDeadlineExceededError() { + hostDownErrorTest(s, + func(ctx context.Context, client historyservice.HistoryServiceClient) error { + <-ctx.Done() + return ctx.Err() + }, + func(err error) { + s.ErrorIs(err, context.DeadlineExceeded) + }) +} + +func (s *cachingRedirectorSuite) TestUnavailableError() { + hostDownErrorTest(s, + func(ctx context.Context, client historyservice.HistoryServiceClient) error { + return serviceerror.NewUnavailable("unavail") + }, + func(err error) { + unavail := &serviceerror.Unavailable{} + s.ErrorAs(err, &unavail) + }) +} + +func (s *cachingRedirectorSuite) TestShardOwnershipLostErrors() { + testAddr1 := rpcAddress("testaddr1") + testAddr2 := rpcAddress("testaddr2") + shardID := int32(1) + + mockClient1 := historyservicemock.NewMockHistoryServiceClient(s.controller) + mockClient2 := historyservicemock.NewMockHistoryServiceClient(s.controller) + + r := newCachingRedirector(s.connections, s.resolver, s.logger) + opCalls := 1 + doExecute := func() error { + return r.execute( + context.Background(), + shardID, + func(ctx context.Context, client historyservice.HistoryServiceClient) error { + switch opCalls { + case 1: + if client != mockClient1 { + return errors.New("wrong client") + } + opCalls++ + return serviceerrors.NewShardOwnershipLost(string(testAddr1), "current") + case 2: + if client != mockClient1 { + return errors.New("wrong client") + } + opCalls++ + return serviceerrors.NewShardOwnershipLost("", "current") + case 3: + if client != mockClient1 { + return errors.New("wrong client") + } + opCalls++ + return serviceerrors.NewShardOwnershipLost(string(testAddr2), "current") + case 4: + if client != mockClient2 { + return errors.New("wrong client") + } + opCalls++ + return nil + case 5: + if client != mockClient2 { + return errors.New("wrong client") + } + opCalls++ + return nil + } + return errors.New("too many op calls") + }, + ) + } + + // opCall 1: return SOL, but with same owner as current. + s.resolver.EXPECT(). + Lookup(convert.Int32ToString(shardID)). + Return(membership.NewHostInfoFromAddress(string(testAddr1)), nil). + Times(1) + + clientConn1 := clientConnection{ + historyClient: mockClient1, + } + s.connections.EXPECT(). + getOrCreateClientConn(testAddr1). + Return(clientConn1). + Times(1) + s.connections.EXPECT(). + resetConnectBackoff(clientConn1). + Times(1) + + err := doExecute() + s.Error(err) + solErr := &serviceerrors.ShardOwnershipLost{} + s.ErrorAs(err, &solErr) + s.Equal(string(testAddr1), solErr.OwnerHost) + + // opCall 2: return SOL, but with empty new owner hint. + s.resolver.EXPECT(). + Lookup(convert.Int32ToString(shardID)). + Return(membership.NewHostInfoFromAddress(string(testAddr1)), nil). + Times(1) + + s.connections.EXPECT(). + getOrCreateClientConn(testAddr1). + Return(clientConn1). + Times(1) + s.connections.EXPECT(). + resetConnectBackoff(clientConn1). + Times(1) + + err = doExecute() + s.Error(err) + solErr = &serviceerrors.ShardOwnershipLost{} + s.ErrorAs(err, &solErr) + s.Empty(solErr.OwnerHost) + s.Equal(3, opCalls) + + // opCall 3 & 4: return SOL with new owner hint. + s.resolver.EXPECT(). + Lookup(convert.Int32ToString(shardID)). + Return(membership.NewHostInfoFromAddress(string(testAddr1)), nil). + Times(1) + + s.connections.EXPECT(). + getOrCreateClientConn(testAddr1). + Return(clientConn1). + Times(1) + s.connections.EXPECT(). + resetConnectBackoff(clientConn1). + Times(1) + + clientConn2 := clientConnection{ + historyClient: mockClient2, + } + s.connections.EXPECT(). + getOrCreateClientConn(testAddr2). + Return(clientConn2). + Times(1) + s.connections.EXPECT(). + resetConnectBackoff(clientConn2). + Times(1) + + err = doExecute() + s.NoError(err) + s.Equal(5, opCalls) + + // OpCall 5: should use cached lookup & connection, so no additional mocks. + err = doExecute() + s.NoError(err) +} + +func (s *cachingRedirectorSuite) TestClientForTargetByShard() { + testAddr := rpcAddress("testaddr") + shardID := int32(1) + + s.resolver.EXPECT(). + Lookup(convert.Int32ToString(shardID)). + Return(membership.NewHostInfoFromAddress(string(testAddr)), nil). + Times(1) + + mockClient := historyservicemock.NewMockHistoryServiceClient(s.controller) + clientConn := clientConnection{ + historyClient: mockClient, + } + s.connections.EXPECT(). + getOrCreateClientConn(testAddr). + Return(clientConn) + s.connections.EXPECT(). + resetConnectBackoff(clientConn). + Times(1) + + r := newCachingRedirector(s.connections, s.resolver, s.logger) + cli, err := r.clientForShardID(shardID) + s.NoError(err) + s.Equal(mockClient, cli) + + // No additional mocks; lookup should have been cached + cli, err = r.clientForShardID(shardID) + s.NoError(err) + s.Equal(mockClient, cli) +} diff --git a/client/history/client.go b/client/history/client.go index 5612831fed0..3f89b9cc6a3 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -41,6 +41,7 @@ import ( replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/debug" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" @@ -64,6 +65,7 @@ type clientImpl struct { // NewClient creates a new history service gRPC client func NewClient( + dc *dynamicconfig.Collection, historyServiceResolver membership.ServiceResolver, logger log.Logger, numberOfShards int32, @@ -71,11 +73,21 @@ func NewClient( timeout time.Duration, ) historyservice.HistoryServiceClient { connections := newConnectionPool(historyServiceResolver, rpcFactory) + + var redirector redirector + if dc.GetBoolProperty(dynamicconfig.HistoryClientOwnershipCachingEnabled, false)() { + logger.Info("historyClient: ownership caching enabled") + redirector = newCachingRedirector(connections, historyServiceResolver, logger) + } else { + logger.Info("historyClient: ownership caching disabled") + redirector = newBasicRedirector(connections, historyServiceResolver) + } + return &clientImpl{ connections: connections, logger: logger, numberOfShards: numberOfShards, - redirector: newBasicRedirector(connections, historyServiceResolver), + redirector: redirector, timeout: timeout, tokenSerializer: common.NewProtoTaskTokenSerializer(), } diff --git a/client/history/connections.go b/client/history/connections.go index 17c9dba62ec..3ac4dc5b21f 100644 --- a/client/history/connections.go +++ b/client/history/connections.go @@ -57,6 +57,7 @@ type ( connectionPool interface { getOrCreateClientConn(addr rpcAddress) clientConnection getAllClientConns() []clientConnection + resetConnectBackoff(clientConnection) } ) @@ -108,3 +109,7 @@ func (c *connectionPoolImpl) getAllClientConns() []clientConnection { return clientConns } + +func (c *connectionPoolImpl) resetConnectBackoff(cc clientConnection) { + cc.grpcConn.ResetConnectBackoff() +} diff --git a/client/history/connections_mock.go b/client/history/connections_mock.go index 0ef4823955e..5a53df5fbaa 100644 --- a/client/history/connections_mock.go +++ b/client/history/connections_mock.go @@ -84,3 +84,15 @@ func (mr *MockconnectionPoolMockRecorder) getOrCreateClientConn(addr interface{} mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getOrCreateClientConn", reflect.TypeOf((*MockconnectionPool)(nil).getOrCreateClientConn), addr) } + +// resetConnectBackoff mocks base method. +func (m *MockconnectionPool) resetConnectBackoff(arg0 clientConnection) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "resetConnectBackoff", arg0) +} + +// resetConnectBackoff indicates an expected call of resetConnectBackoff. +func (mr *MockconnectionPoolMockRecorder) resetConnectBackoff(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "resetConnectBackoff", reflect.TypeOf((*MockconnectionPool)(nil).resetConnectBackoff), arg0) +} diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a1caedf6f22..d90528378df 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -501,6 +501,9 @@ const ( AcquireShardInterval = "history.acquireShardInterval" // AcquireShardConcurrency is number of goroutines that can be used to acquire shards in the shard controller. AcquireShardConcurrency = "history.acquireShardConcurrency" + // HistoryClientOwnershipCachingEnabled configures if history clients try to cache + // shard ownership information, instead of checking membership for each request. + HistoryClientOwnershipCachingEnabled = "history.clientOwnershipCachingEnabled" // StandbyClusterDelay is the artificial delay added to standby cluster's view of active cluster's time StandbyClusterDelay = "history.standbyClusterDelay" // StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing) diff --git a/service/history/configs/config.go b/service/history/configs/config.go index b595624e5da..5702d1a3e86 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -83,6 +83,8 @@ type Config struct { AcquireShardInterval dynamicconfig.DurationPropertyFn AcquireShardConcurrency dynamicconfig.IntPropertyFn + HistoryClientOwnershipCachingEnabled dynamicconfig.BoolPropertyFn + // the artificial delay added to standby cluster's view of active cluster's time StandbyClusterDelay dynamicconfig.DurationPropertyFn StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter @@ -359,9 +361,12 @@ func NewConfig( EventsCacheMaxSizeBytes: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSizeBytes, 512*1024), // 512KB EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour), - RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range - AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), - AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 10), + RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range + AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), + AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 10), + + HistoryClientOwnershipCachingEnabled: dc.GetBoolProperty(dynamicconfig.HistoryClientOwnershipCachingEnabled, false), + StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute), StandbyTaskMissingEventsResendDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute), StandbyTaskMissingEventsDiscardDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),