Skip to content

Commit

Permalink
Support cluster metadata customized tags (#4622)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Support cluster metadata customized tags

<!-- Tell your future self why have you made these changes -->
**Why?**
Allow static tagging cluster 

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Local testing

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No
  • Loading branch information
yux0 committed Jul 14, 2023
1 parent 2fab90b commit a01a309
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
16 changes: 13 additions & 3 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/maps"

"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -98,6 +100,8 @@ type (
CurrentClusterName string `yaml:"currentClusterName"`
// ClusterInformation contains all cluster names to corresponding information about that cluster
ClusterInformation map[string]ClusterInformation `yaml:"clusterInformation"`
// Tag contains customized tag about the current cluster
Tags map[string]string `yaml:"tags"`
}

// ClusterInformation contains the information about each cluster which participated in cross DC
Expand All @@ -107,8 +111,9 @@ type (
// Address indicate the remote service address(Host:Port). Host can be DNS name.
RPCAddress string `yaml:"rpcAddress"`
// Cluster ID allows to explicitly set the ID of the cluster. Optional.
ClusterID string `yaml:"-"`
ShardCount int32 `yaml:"-"` // Ignore this field when loading config.
ClusterID string `yaml:"-"`
ShardCount int32 `yaml:"-"` // Ignore this field when loading config.
Tags map[string]string `yaml:"-"` // Ignore this field. Use cluster.Config.Tags for customized tags.
// private field to track cluster information updates
version int64
}
Expand Down Expand Up @@ -463,12 +468,14 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
RPCAddress: newClusterInfo.RPCAddress,
ShardCount: newClusterInfo.ShardCount,
Tags: newClusterInfo.Tags,
version: newClusterInfo.version,
}
} else if newClusterInfo.version > oldClusterInfo.version {
if newClusterInfo.Enabled == oldClusterInfo.Enabled &&
newClusterInfo.RPCAddress == oldClusterInfo.RPCAddress &&
newClusterInfo.InitialFailoverVersion == oldClusterInfo.InitialFailoverVersion {
newClusterInfo.InitialFailoverVersion == oldClusterInfo.InitialFailoverVersion &&
maps.Equal(newClusterInfo.Tags, oldClusterInfo.Tags) {
// key cluster info does not change
continue
}
Expand All @@ -478,13 +485,15 @@ func (m *metadataImpl) refreshClusterMetadata(ctx context.Context) error {
InitialFailoverVersion: oldClusterInfo.InitialFailoverVersion,
RPCAddress: oldClusterInfo.RPCAddress,
ShardCount: oldClusterInfo.ShardCount,
Tags: oldClusterInfo.Tags,
version: oldClusterInfo.version,
}
newEntries[clusterName] = &ClusterInformation{
Enabled: newClusterInfo.Enabled,
InitialFailoverVersion: newClusterInfo.InitialFailoverVersion,
RPCAddress: newClusterInfo.RPCAddress,
ShardCount: newClusterInfo.ShardCount,
Tags: newClusterInfo.Tags,
version: newClusterInfo.version,
}
}
Expand Down Expand Up @@ -589,6 +598,7 @@ func (m *metadataImpl) listAllClusterMetadataFromDB(
InitialFailoverVersion: getClusterResp.GetInitialFailoverVersion(),
RPCAddress: getClusterResp.GetClusterAddress(),
ShardCount: getClusterResp.GetHistoryShardCount(),
Tags: getClusterResp.GetTags(),
version: getClusterResp.Version,
}
}
Expand Down
36 changes: 35 additions & 1 deletion common/cluster/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (
failoverVersionIncrement int64
clusterName string
secondClusterName string
thirdClusterName string
}
)

Expand All @@ -77,6 +78,7 @@ func (s *metadataSuite) SetupTest() {
s.failoverVersionIncrement = 100
s.clusterName = uuid.New()
s.secondClusterName = uuid.New()
s.thirdClusterName = uuid.New()

clusterInfo := map[string]ClusterInformation{
s.clusterName: {
Expand All @@ -93,6 +95,13 @@ func (s *metadataSuite) SetupTest() {
ShardCount: 2,
version: 1,
},
s.thirdClusterName: {
Enabled: true,
InitialFailoverVersion: int64(5),
RPCAddress: uuid.New(),
ShardCount: 1,
version: 1,
},
}
s.metadata = NewMetadata(
s.isGlobalNamespaceEnabled,
Expand Down Expand Up @@ -143,7 +152,7 @@ func (s *metadataSuite) Test_RegisterMetadataChangeCallback() {
s.metadata.RegisterMetadataChangeCallback(
s,
func(oldClusterMetadata map[string]*ClusterInformation, newClusterMetadata map[string]*ClusterInformation) {
s.Equal(2, len(newClusterMetadata))
s.Equal(3, len(newClusterMetadata))
})

s.metadata.UnRegisterMetadataChangeCallback(s)
Expand All @@ -166,12 +175,20 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
newMetadata, ok = newClusterMetadata[s.secondClusterName]
s.True(ok)
s.Nil(newMetadata)

oldMetadata, ok = oldClusterMetadata[s.thirdClusterName]
s.True(ok)
s.NotNil(oldMetadata)
newMetadata, ok = newClusterMetadata[s.thirdClusterName]
s.True(ok)
s.NotNil(newMetadata)
}

s.mockClusterMetadataStore.EXPECT().ListClusterMetadata(gomock.Any(), gomock.Any()).Return(
&persistence.ListClusterMetadataResponse{
ClusterMetadata: []*persistence.GetClusterMetadataResponse{
{
// No change and not include in callback
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: s.clusterName,
IsConnectionEnabled: true,
Expand All @@ -182,19 +199,36 @@ func (s *metadataSuite) Test_RefreshClusterMetadata_Success() {
Version: 1,
},
{
// Updated, included in callback
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: s.thirdClusterName,
IsConnectionEnabled: true,
InitialFailoverVersion: 1,
HistoryShardCount: 1,
ClusterAddress: uuid.New(),
Tags: map[string]string{"test": "test"},
},
Version: 2,
},
{
// Newly added, included in callback
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: id,
IsConnectionEnabled: true,
InitialFailoverVersion: 2,
HistoryShardCount: 2,
ClusterAddress: uuid.New(),
Tags: map[string]string{"test": "test"},
},
Version: 2,
},
},
}, nil)
err := s.metadata.refreshClusterMetadata(context.Background())
s.NoError(err)
clusterInfo := s.metadata.GetAllClusterInfo()
s.Equal("test", clusterInfo[s.thirdClusterName].Tags["test"])
s.Equal("test", clusterInfo[id].Tags["test"])
}

func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {
Expand Down
8 changes: 7 additions & 1 deletion temporal/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/api/serviceerror"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"golang.org/x/exp/maps"
"google.golang.org/grpc"

persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -719,6 +720,7 @@ func loadClusterInformationFromStore(ctx context.Context, svc *config.Config, cl
InitialFailoverVersion: metadata.InitialFailoverVersion,
RPCAddress: metadata.ClusterAddress,
ShardCount: shardCount,
Tags: metadata.Tags,
}
if staticClusterMetadata, ok := svc.ClusterMetadata.ClusterInformation[metadata.ClusterName]; ok {
if metadata.ClusterName != svc.ClusterMetadata.CurrentClusterName {
Expand Down Expand Up @@ -770,6 +772,7 @@ func initCurrentClusterMetadataRecord(
IsConnectionEnabled: currentClusterInfo.Enabled,
UseClusterIdMembership: true, // Enable this for new cluster after 1.19. This is to prevent two clusters join into one ring.
IndexSearchAttributes: initialIndexSearchAttributes,
Tags: svc.ClusterMetadata.Tags,
},
})
if err != nil {
Expand Down Expand Up @@ -804,7 +807,10 @@ func updateCurrentClusterMetadataRecord(
currentClusterDBRecord.ClusterAddress = currentCLusterInfo.RPCAddress
updateDBRecord = true
}
// TODO: Add cluster tags
if !maps.Equal(currentClusterDBRecord.Tags, svc.ClusterMetadata.Tags) {
currentClusterDBRecord.Tags = svc.ClusterMetadata.Tags
updateDBRecord = true
}

if !updateDBRecord {
return nil
Expand Down

0 comments on commit a01a309

Please sign in to comment.