Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into snowden/rate-limite…
Browse files Browse the repository at this point in the history
…r-clock
  • Loading branch information
MichaelSnowden committed Jul 14, 2023
2 parents d49f104 + a01a309 commit 537656c
Show file tree
Hide file tree
Showing 27 changed files with 214 additions and 158 deletions.
10 changes: 5 additions & 5 deletions .golangci.yml
Expand Up @@ -30,6 +30,8 @@ linters-settings:
# Disabled rules
- name: add-constant
disabled: true
- name: argument-limit
disabled: true
- name: bare-return
disabled: true
- name: banned-characters
Expand Down Expand Up @@ -78,9 +80,6 @@ linters-settings:
disabled: true

# Rule tuning
- name: argument-limit
arguments:
- 10
- name: cognitive-complexity
arguments:
- 25
Expand All @@ -92,8 +91,9 @@ linters-settings:
- 3
- name: unhandled-error
arguments:
- "fmt.Printf"
- "fmt.Println"
- "fmt.*"
- "bytes.Buffer.*"
- "strings.Builder.*"
issues:
# Exclude cyclomatic and cognitive complexity rules for functional tests in the `tests` root directory.
exclude-rules:
Expand Down
16 changes: 13 additions & 3 deletions common/cluster/metadata.go
Expand Up @@ -35,6 +35,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/maps"

"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -98,6 +100,8 @@ type (
CurrentClusterName string `yaml:"currentClusterName"`
// ClusterInformation contains all cluster names to corresponding information about that cluster
ClusterInformation map[string]ClusterInformation `yaml:"clusterInformation"`
// Tag contains customized tag about the current cluster
Tags map[string]string `yaml:"tags"`
}

// ClusterInformation contains the information about each cluster which participated in cross DC
Expand All @@ -107,8 +111,9 @@ type (
// Address indicate the remote service address(Host:Port). Host can be DNS name.
RPCAddress string `yaml:"rpcAddress"`
// Cluster ID allows to explicitly set the ID of the cluster. Optional.
ClusterID string `yaml:"-"`
ShardCount int32 `yaml:"-"` // Ignore this field when loading config.
ClusterID string `yaml:"-"`
ShardCount int32 `yaml:"-"` // Ignore this field when loading config.
Tags map[string]string `yaml:"-"` // Ignore this field. Use cluster.Config.Tags for customized tags.
// private field to track cluster information updates
version int64
}
Expand Down Expand Up @@ -463,12 +468,14 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
RPCAddress: newClusterInfo.RPCAddress,
ShardCount: newClusterInfo.ShardCount,
Tags: newClusterInfo.Tags,
version: newClusterInfo.version,
}
} else if newClusterInfo.version > oldClusterInfo.version {
if newClusterInfo.Enabled == oldClusterInfo.Enabled &&
newClusterInfo.RPCAddress == oldClusterInfo.RPCAddress &&
newClusterInfo.InitialFailoverVersion == oldClusterInfo.InitialFailoverVersion {
newClusterInfo.InitialFailoverVersion == oldClusterInfo.InitialFailoverVersion &&
maps.Equal(newClusterInfo.Tags, oldClusterInfo.Tags) {
// key cluster info does not change
continue
}
Expand All @@ -478,13 +485,15 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
InitialFailoverVersion: oldClusterInfo.InitialFailoverVersion,
RPCAddress: oldClusterInfo.RPCAddress,
ShardCount: oldClusterInfo.ShardCount,
Tags: oldClusterInfo.Tags,
version: oldClusterInfo.version,
}
newEntries[clusterName] = &ClusterInformation{
Enabled: newClusterInfo.Enabled,
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
RPCAddress: newClusterInfo.RPCAddress,
ShardCount: newClusterInfo.ShardCount,
Tags: newClusterInfo.Tags,
version: newClusterInfo.version,
}
}
Expand Down Expand Up @@ -589,6 +598,7 @@ func (m *metadataImpl) listAllClusterMetadataFromDB(
InitialFailoverVersion: getClusterResp.GetInitialFailoverVersion(),
RPCAddress: getClusterResp.GetClusterAddress(),
ShardCount: getClusterResp.GetHistoryShardCount(),
Tags: getClusterResp.GetTags(),
version: getClusterResp.Version,
}
}
Expand Down
36 changes: 35 additions & 1 deletion common/cluster/metadata_test.go
Expand Up @@ -53,6 +53,7 @@ type (
failoverVersionIncrement int64
clusterName string
secondClusterName string
thirdClusterName string
}
)

Expand All @@ -77,6 +78,7 @@ func (s *metadataSuite) SetupTest() {
s.failoverVersionIncrement = 100
s.clusterName = uuid.New()
s.secondClusterName = uuid.New()
s.thirdClusterName = uuid.New()

clusterInfo := map[string]ClusterInformation{
s.clusterName: {
Expand All @@ -93,6 +95,13 @@ func (s *metadataSuite) SetupTest() {
ShardCount: 2,
version: 1,
},
s.thirdClusterName: {
Enabled: true,
InitialFailoverVersion: int64(5),
RPCAddress: uuid.New(),
ShardCount: 1,
version: 1,
},
}
s.metadata = NewMetadata(
s.isGlobalNamespaceEnabled,
Expand Down Expand Up @@ -143,7 +152,7 @@ func (s *metadataSuite) Test_RegisterMetadataChangeCallback() {
s.metadata.RegisterMetadataChangeCallback(
s,
func(oldClusterMetadata map[string]*ClusterInformation, newClusterMetadata map[string]*ClusterInformation) {
s.Equal(2, len(newClusterMetadata))
s.Equal(3, len(newClusterMetadata))
})

s.metadata.UnRegisterMetadataChangeCallback(s)
Expand All @@ -166,12 +175,20 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
newMetadata, ok = newClusterMetadata[s.secondClusterName]
s.True(ok)
s.Nil(newMetadata)

oldMetadata, ok = oldClusterMetadata[s.thirdClusterName]
s.True(ok)
s.NotNil(oldMetadata)
newMetadata, ok = newClusterMetadata[s.thirdClusterName]
s.True(ok)
s.NotNil(newMetadata)
}

s.mockClusterMetadataStore.EXPECT().ListClusterMetadata(gomock.Any(), gomock.Any()).Return(
&persistence.ListClusterMetadataResponse{
ClusterMetadata: []*persistence.GetClusterMetadataResponse{
{
// No change and not include in callback
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: s.clusterName,
IsConnectionEnabled: true,
Expand All @@ -182,19 +199,36 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
Version: 1,
},
{
// Updated, included in callback
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: s.thirdClusterName,
IsConnectionEnabled: true,
InitialFailoverVersion: 1,
HistoryShardCount: 1,
ClusterAddress: uuid.New(),
Tags: map[string]string{"test": "test"},
},
Version: 2,
},
{
// Newly added, included in callback
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: id,
IsConnectionEnabled: true,
InitialFailoverVersion: 2,
HistoryShardCount: 2,
ClusterAddress: uuid.New(),
Tags: map[string]string{"test": "test"},
},
Version: 2,
},
},
}, nil)
err := s.metadata.refreshClusterMetadata(context.Background())
s.NoError(err)
clusterInfo := s.metadata.GetAllClusterInfo()
s.Equal("test", clusterInfo[s.thirdClusterName].Tags["test"])
s.Equal("test", clusterInfo[id].Tags["test"])
}

func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {
Expand Down
1 change: 0 additions & 1 deletion common/namespace/registry.go
Expand Up @@ -33,7 +33,6 @@ import (
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
Expand Down
1 change: 0 additions & 1 deletion service/history/archival_queue_task_executor_test.go
Expand Up @@ -354,7 +354,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes()
mockMetadata := cluster.NewMockMetadata(p.Controller)
mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes()

shardID := int32(1)
Expand Down
17 changes: 4 additions & 13 deletions service/history/queues/executable.go
Expand Up @@ -177,18 +177,15 @@ func (e *executableImpl) Execute() (retErr error) {
e.Unlock()
return nil
}
var namespaceName string
ns, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(e.GetNamespaceID()))
if err == nil {
namespaceName = ns.Name().String()
}

ns, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
var callerInfo headers.CallerInfo
switch e.priority {
case ctasks.PriorityHigh:
callerInfo = headers.NewBackgroundCallerInfo(namespaceName)
callerInfo = headers.NewBackgroundCallerInfo(ns.String())
default:
// priority low or unknown
callerInfo = headers.NewPreemptableCallerInfo(namespaceName)
callerInfo = headers.NewPreemptableCallerInfo(ns.String())
}
ctx := headers.SetCallerInfo(
metrics.AddMetricsContext(context.Background()),
Expand Down Expand Up @@ -232,12 +229,6 @@ func (e *executableImpl) Execute() (retErr error) {
// Not doing it here as for certain errors latency for the attempt should not be counted
}()

if ns != nil && !ns.IsOnCluster(e.clusterMetadata.GetCurrentClusterName()) {
// Discard task if the namespace is not on the current cluster.
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
return consts.ErrTaskDiscarded
}

metricsTags, isActive, err := e.executor.Execute(ctx, e)
e.taggedMetricsHandler = e.metricsHandler.WithTags(metricsTags...)

Expand Down
14 changes: 0 additions & 14 deletions service/history/queues/executable_test.go
Expand Up @@ -37,7 +37,6 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
Expand Down Expand Up @@ -271,19 +270,6 @@ func (s *executableSuite) TestExecute_CallerInfo() {
s.NoError(executable.Execute())
}

func (s *executableSuite) TestExecute_DiscardTask() {
executable := s.newTestExecutable()
registry := namespace.NewMockRegistry(s.controller)
executable.(*executableImpl).namespaceRegistry = registry
ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{
ActiveClusterName: "nonCurrentCluster",
Clusters: []string{"nonCurrentCluster"},
}, 1)

registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil).Times(2)
s.ErrorIs(executable.Execute(), consts.ErrTaskDiscarded)
}

func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() {
executable := s.newTestExecutable()
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil, true, errors.New("some random error"))
Expand Down
9 changes: 9 additions & 0 deletions service/history/timer_queue_standby_task_executor.go
Expand Up @@ -435,6 +435,15 @@ func (t *timerQueueStandbyTaskExecutor) processTimer(
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(timerTask.GetNamespaceID()))
if err != nil {
return err
}
if !nsRecord.IsOnCluster(t.clusterName) {
// discard standby tasks
return consts.ErrTaskDiscarded
}

executionContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, timerTask)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions service/history/transfer_queue_standby_task_executor.go
Expand Up @@ -503,6 +503,15 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer(
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
return err
}
if !nsRecord.IsOnCluster(t.clusterName) {
// discard standby tasks
return consts.ErrTaskDiscarded
}

weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, taskInfo)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion service/matching/ack_manager.go
Expand Up @@ -120,7 +120,7 @@ func (m *ackManager) completeTask(taskID int64) (ackLevel int64) {
m.backlogCounter.Dec()
}

// TODO the ack level management shuld be done by a dedicated coroutine
// TODO the ack level management should be done by a dedicated coroutine
// this is only a temporarily solution

taskIDs := maps.Keys(m.outstandingTasks)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/config.go
Expand Up @@ -124,7 +124,7 @@ type (
AdminNamespaceTaskQueueToPartitionDispatchRate func() float64

// If set to false, matching does not load user data from DB for root partitions or fetch it via RPC from the
// root. When disbled, features that rely on user data (e.g. worker versioning) will essentially be disabled.
// root. When disabled, features that rely on user data (e.g. worker versioning) will essentially be disabled.
// See the documentation for constants.MatchingLoadUserData for the implications on versioning.
LoadUserData func() bool

Expand Down
6 changes: 3 additions & 3 deletions service/matching/db.go
Expand Up @@ -310,16 +310,16 @@ func (db *taskQueueDB) CompleteTasksLessThan(
return n, err
}

// Returns true if we are storing user data in the db. We need to be the root partition,
// workflow type, unversioned, and also a normal queue.
// DbStoresUserData returns true if we are storing user data in the db. We need to be the root partition, workflow type,
// unversioned, and also a normal queue.
func (db *taskQueueDB) DbStoresUserData() bool {
return db.taskQueue.OwnsUserData() && db.taskQueueKind == enumspb.TASK_QUEUE_KIND_NORMAL
}

// GetUserData returns the versioning data for this task queue. Do not mutate the returned pointer, as doing so
// will cause cache inconsistency.
func (db *taskQueueDB) GetUserData(
ctx context.Context,
context.Context,
) (*persistencespb.VersionedTaskQueueUserData, chan struct{}, error) {
db.Lock()
defer db.Unlock()
Expand Down
9 changes: 3 additions & 6 deletions service/matching/forwarder.go
Expand Up @@ -113,7 +113,7 @@ func newForwarder(
return fwdr
}

// ForwardTask forwards an activity or workflow task to the parent task queue partition if it exist
// ForwardTask forwards an activity or workflow task to the parent task queue partition if it exists
func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) error {
if fwdr.taskQueueKind == enumspb.TASK_QUEUE_KIND_STICKY {
return errTaskQueueKind
Expand All @@ -131,15 +131,12 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro

var expirationDuration time.Duration
expirationTime := timestamp.TimeValue(task.event.Data.ExpiryTime)
if expirationTime.IsZero() {
// noop
} else {
if !expirationTime.IsZero() {
expirationDuration = time.Until(expirationTime)
if expirationDuration <= 0 {
return nil
}
}

switch fwdr.taskQueueID.taskType {
case enumspb.TASK_QUEUE_TYPE_WORKFLOW:
_, err = fwdr.client.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{
Expand Down Expand Up @@ -178,7 +175,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro
return fwdr.handleErr(err)
}

// ForwardQueryTask forwards a query task to parent task queue partition, if it exist
// ForwardQueryTask forwards a query task to parent task queue partition, if it exists
func (fwdr *Forwarder) ForwardQueryTask(
ctx context.Context,
task *internalTask,
Expand Down

0 comments on commit 537656c

Please sign in to comment.