Skip to content

Commit

Permalink
Add additional metrics for stream replication (#4323)
Browse files Browse the repository at this point in the history
* Add additional metrics for stream replication
* Fix shard ownership lost error
  • Loading branch information
wxing1292 committed May 15, 2023
1 parent a136de7 commit d6ab8c5
Show file tree
Hide file tree
Showing 17 changed files with 283 additions and 52 deletions.
11 changes: 7 additions & 4 deletions common/metrics/metric_defs.go
Expand Up @@ -1036,8 +1036,6 @@ const (
ReplicationTaskCleanupScope = "ReplicationTaskCleanup"
// ReplicationDLQStatsScope is scope used by all metrics emitted related to replication DLQ
ReplicationDLQStatsScope = "ReplicationDLQStats"
// SyncWorkflowStateTaskScope is the scope used by closed workflow task replication processing
SyncWorkflowStateTaskScope = "SyncWorkflowStateTask"
// EventsCacheGetEventScope is the scope used by events cache
EventsCacheGetEventScope = "EventsCacheGetEvent"
// EventsCachePutEventScope is the scope used by events cache
Expand Down Expand Up @@ -1148,8 +1146,10 @@ const (
SyncShardTaskScope = "SyncShardTask"
// SyncActivityTaskScope is the scope used by sync activity
SyncActivityTaskScope = "SyncActivityTask"
// SyncWorkflowTaskScope is the scope used by sync workflow
SyncWorkflowTaskScope = "SyncWorkflowTask"
// SyncWorkflowStateTaskScope is the scope used by closed workflow task replication processing
SyncWorkflowStateTaskScope = "SyncWorkflowStateTask"
// SyncWatermarkScope is the scope used by closed workflow task replication processing
SyncWatermarkScope = "SyncWatermark"
// NoopTaskScope is the scope used by noop task
NoopTaskScope = "NoopTask"
// UnknownTaskScope is the scope used by unknown task
Expand Down Expand Up @@ -1445,6 +1445,9 @@ var (
WorkflowContinuedAsNewCount = NewCounterDef("workflow_continued_as_new")
LastRetrievedMessageID = NewGaugeDef("last_retrieved_message_id")
LastProcessedMessageID = NewGaugeDef("last_processed_message_id")
ReplicationTasksSend = NewCounterDef("replication_tasks_send")
ReplicationTasksRecv = NewCounterDef("replication_tasks_recv")
ReplicationTasksRecvBacklog = NewDimensionlessHistogramDef("replication_tasks_recv_backlog")
ReplicationTasksApplied = NewCounterDef("replication_tasks_applied")
ReplicationTasksFailed = NewCounterDef("replication_tasks_failed")
ReplicationTasksLag = NewTimerDef("replication_tasks_lag")
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Expand Up @@ -45,6 +45,8 @@ const (
namespace = "namespace"
namespaceState = "namespace_state"
targetCluster = "target_cluster"
fromCluster = "from_cluster"
toCluster = "to_cluster"
taskQueue = "taskqueue"
workflowType = "workflowType"
activityType = "activityType"
Expand Down Expand Up @@ -137,6 +139,16 @@ func TargetClusterTag(value string) Tag {
return &tagImpl{key: targetCluster, value: value}
}

// FromClusterIDTag returns a new from cluster tag.
func FromClusterIDTag(value int32) Tag {
return &tagImpl{key: fromCluster, value: strconv.FormatInt(int64(value), 10)}
}

// ToClusterIDTag returns a new to cluster tag.
func ToClusterIDTag(value int32) Tag {
return &tagImpl{key: toCluster, value: strconv.FormatInt(int64(value), 10)}
}

// TaskQueueTag returns a new task queue tag.
func TaskQueueTag(value string) Tag {
if len(value) == 0 {
Expand Down
30 changes: 24 additions & 6 deletions service/history/api/replication/stream.go
Expand Up @@ -42,9 +42,11 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)
Expand Down Expand Up @@ -100,7 +102,7 @@ func StreamReplicationTasks(
return recvLoop(ctx, server, shardContext, clientClusterShardID, serverClusterShardID)
})
errGroup.Go(func() error {
return sendLoop(ctx, server, shardContext, filter, clientClusterShardID)
return sendLoop(ctx, server, shardContext, filter, clientClusterShardID, serverClusterShardID)
})
return errGroup.Wait()
}
Expand Down Expand Up @@ -136,6 +138,12 @@ func recvLoop(
)
return err
}
shardContext.GetMetricsHandler().Counter(metrics.ReplicationTasksRecv.GetMetricName()).Record(
int64(1),
metrics.FromClusterIDTag(clientClusterShardID.ClusterID),
metrics.ToClusterIDTag(serverClusterShardID.ClusterID),
metrics.OperationTag(metrics.SyncWatermarkScope),
)
default:
return serviceerror.NewInternal(fmt.Sprintf(
"StreamReplicationMessages encountered unknown type: %T %v", attr, attr,
Expand Down Expand Up @@ -180,11 +188,7 @@ func recvSyncReplicationState(
readerID,
readerState,
); err != nil {
shardContext.GetLogger().Error(
"error updating replication level for shard",
tag.Error(err),
tag.OperationFailed,
)
return err
}
shardContext.UpdateRemoteClusterInfo(
string(clientClusterShardID.ClusterID),
Expand All @@ -200,6 +204,7 @@ func sendLoop(
shardContext shard.Context,
taskConvertor TaskConvertor,
clientClusterShardID historyclient.ClusterShardID,
serverClusterShardID historyclient.ClusterShardID,
) error {
engine, err := shardContext.GetEngine(ctx)
if err != nil {
Expand All @@ -214,6 +219,7 @@ func sendLoop(
shardContext,
taskConvertor,
clientClusterShardID,
serverClusterShardID,
)
if err != nil {
shardContext.GetLogger().Error(
Expand All @@ -228,6 +234,7 @@ func sendLoop(
shardContext,
taskConvertor,
clientClusterShardID,
serverClusterShardID,
newTaskNotificationChan,
catchupEndExclusiveWatermark,
); err != nil {
Expand All @@ -247,6 +254,7 @@ func sendCatchUp(
shardContext shard.Context,
taskConvertor TaskConvertor,
clientClusterShardID historyclient.ClusterShardID,
serverClusterShardID historyclient.ClusterShardID,
) (int64, error) {

readerID := shard.ReplicationReaderIDFromClusterShardID(
Expand Down Expand Up @@ -275,6 +283,7 @@ func sendCatchUp(
shardContext,
taskConvertor,
clientClusterShardID,
serverClusterShardID,
catchupBeginInclusiveWatermark,
catchupEndExclusiveWatermark,
); err != nil {
Expand All @@ -289,6 +298,7 @@ func sendLive(
shardContext shard.Context,
taskConvertor TaskConvertor,
clientClusterShardID historyclient.ClusterShardID,
serverClusterShardID historyclient.ClusterShardID,
newTaskNotificationChan <-chan struct{},
beginInclusiveWatermark int64,
) error {
Expand All @@ -302,6 +312,7 @@ func sendLive(
shardContext,
taskConvertor,
clientClusterShardID,
serverClusterShardID,
beginInclusiveWatermark,
endExclusiveWatermark,
); err != nil {
Expand All @@ -320,6 +331,7 @@ func sendTasks(
shardContext shard.Context,
taskConvertor TaskConvertor,
clientClusterShardID historyclient.ClusterShardID,
serverClusterShardID historyclient.ClusterShardID,
beginInclusiveWatermark int64,
endExclusiveWatermark int64,
) error {
Expand Down Expand Up @@ -368,6 +380,12 @@ Loop:
}); err != nil {
return err
}
shardContext.GetMetricsHandler().Counter(metrics.ReplicationTasksSend.GetMetricName()).Record(
int64(1),
metrics.FromClusterIDTag(serverClusterShardID.ClusterID),
metrics.ToClusterIDTag(clientClusterShardID.ClusterID),
metrics.OperationTag(replication.TaskOperationTag(task)),
)
}
return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Expand Down
55 changes: 54 additions & 1 deletion service/history/api/replication/stream_test.go
Expand Up @@ -42,7 +42,11 @@ import (
replicationspb "go.temporal.io/server/api/replication/v1"
historyclient "go.temporal.io/server/client/history"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)
Expand Down Expand Up @@ -97,13 +101,15 @@ func (s *streamSuite) SetupTest() {
ShardID: rand.Int31(),
}
s.shardContext.EXPECT().GetEngine(gomock.Any()).Return(s.historyEngine, nil).AnyTimes()
s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes()
s.shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger()).AnyTimes()
}

func (s *streamSuite) TearDownTest() {
s.controller.Finish()
}

func (s *streamSuite) TestRecvSyncReplicationState() {
func (s *streamSuite) TestRecvSyncReplicationState_Success() {
readerID := shard.ReplicationReaderIDFromClusterShardID(
int64(s.clientClusterShardID.ClusterID),
s.clientClusterShardID.ShardID,
Expand Down Expand Up @@ -142,6 +148,48 @@ func (s *streamSuite) TestRecvSyncReplicationState() {
s.NoError(err)
}

func (s *streamSuite) TestRecvSyncReplicationState_Error() {
readerID := shard.ReplicationReaderIDFromClusterShardID(
int64(s.clientClusterShardID.ClusterID),
s.clientClusterShardID.ShardID,
)
replicationState := &replicationspb.SyncReplicationState{
LastProcessedMessageId: rand.Int63(),
LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
}

var ownershipLost error
if rand.Float64() < 0.5 {
ownershipLost = &persistence.ShardOwnershipLostError{}
} else {
ownershipLost = serviceerrors.NewShardOwnershipLost("", "")
}

s.shardContext.EXPECT().UpdateReplicationQueueReaderState(
readerID,
&persistencespb.QueueReaderState{
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
),
},
Predicate: &persistencespb.Predicate{
PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL,
Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{},
},
}},
},
).Return(ownershipLost)

err := recvSyncReplicationState(s.shardContext, replicationState, s.clientClusterShardID)
s.Error(err)
s.Equal(ownershipLost, err)
}

func (s *streamSuite) TestSendCatchUp() {
readerID := shard.ReplicationReaderIDFromClusterShardID(
int64(s.clientClusterShardID.ClusterID),
Expand Down Expand Up @@ -199,6 +247,7 @@ func (s *streamSuite) TestSendCatchUp() {
s.shardContext,
s.taskConvertor,
s.clientClusterShardID,
s.serverClusterShardID,
)
s.NoError(err)
s.Equal(endExclusiveWatermark, taskID)
Expand Down Expand Up @@ -260,6 +309,7 @@ func (s *streamSuite) TestSendLive() {
s.shardContext,
s.taskConvertor,
s.clientClusterShardID,
s.serverClusterShardID,
channel,
watermark0,
)
Expand All @@ -276,6 +326,7 @@ func (s *streamSuite) TestSendTasks_Noop() {
s.shardContext,
s.taskConvertor,
s.clientClusterShardID,
s.serverClusterShardID,
beginInclusiveWatermark,
endExclusiveWatermark,
)
Expand Down Expand Up @@ -309,6 +360,7 @@ func (s *streamSuite) TestSendTasks_WithoutTasks() {
s.shardContext,
s.taskConvertor,
s.clientClusterShardID,
s.serverClusterShardID,
beginInclusiveWatermark,
endExclusiveWatermark,
)
Expand Down Expand Up @@ -376,6 +428,7 @@ func (s *streamSuite) TestSendTasks_WithTasks() {
s.shardContext,
s.taskConvertor,
s.clientClusterShardID,
s.serverClusterShardID,
beginInclusiveWatermark,
endExclusiveWatermark,
)
Expand Down
8 changes: 6 additions & 2 deletions service/history/handler.go
Expand Up @@ -1908,9 +1908,13 @@ func (h *Handler) StreamWorkflowReplicationMessages(
}
shardContext, err := h.controller.GetShardByID(serverClusterShardID.ShardID)
if err != nil {
return err
return h.convertError(err)
}
err = replicationapi.StreamReplicationTasks(server, shardContext, clientClusterShardID, serverClusterShardID)
if err != nil {
return h.convertError(err)
}
return replicationapi.StreamReplicationTasks(server, shardContext, clientClusterShardID, serverClusterShardID)
return nil
}

// convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various
Expand Down
9 changes: 9 additions & 0 deletions service/history/replication/executable_task.go
Expand Up @@ -107,6 +107,7 @@ type (
// mutable data
taskState int32
attempt int32
namespace atomic.Value
}
)

Expand Down Expand Up @@ -237,13 +238,20 @@ func (e *ExecutableTaskImpl) Attempt() int {
func (e *ExecutableTaskImpl) emitFinishMetrics(
now time.Time,
) {
nsTag := metrics.NamespaceUnknownTag()
item := e.namespace.Load()
if item != nil {
nsTag = metrics.NamespaceTag(item.(namespace.Name).String())
}
e.MetricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(
now.Sub(e.taskReceivedTime),
metrics.OperationTag(e.metricsTag),
nsTag,
)
e.MetricsHandler.Timer(metrics.ReplicationLatency.GetMetricName()).Record(
e.taskReceivedTime.Sub(e.taskCreationTime),
metrics.OperationTag(e.metricsTag),
nsTag,
)
// TODO consider emit attempt metrics
}
Expand Down Expand Up @@ -335,6 +343,7 @@ func (e *ExecutableTaskImpl) GetNamespaceInfo(
namespaceEntry, err := e.NamespaceCache.GetNamespaceByID(namespace.ID(namespaceID))
switch err.(type) {
case nil:
e.namespace.Store(namespaceEntry.Name())
shouldProcessTask := false
FilterLoop:
for _, targetCluster := range namespaceEntry.ClusterNames() {
Expand Down
8 changes: 8 additions & 0 deletions service/history/replication/executable_task_initializer.go
Expand Up @@ -61,10 +61,18 @@ type (

func (i *ProcessToolBox) ConvertTasks(
taskClusterName string,
clientShardKey ClusterShardKey,
serverShardKey ClusterShardKey,
replicationTasks ...*replicationspb.ReplicationTask,
) []TrackableExecutableTask {
tasks := make([]TrackableExecutableTask, len(replicationTasks))
for index, replicationTask := range replicationTasks {
i.MetricsHandler.Counter(metrics.ReplicationTasksRecv.GetMetricName()).Record(
int64(1),
metrics.FromClusterIDTag(serverShardKey.ClusterID),
metrics.ToClusterIDTag(clientShardKey.ClusterID),
metrics.OperationTag(TaskOperationTag(replicationTask)),
)
tasks[index] = i.convertOne(taskClusterName, replicationTask)
}
return tasks
Expand Down
8 changes: 8 additions & 0 deletions service/history/replication/executable_task_tracker.go
Expand Up @@ -50,6 +50,7 @@ type (
ExecutableTaskTracker interface {
TrackTasks(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask
LowWatermark() *WatermarkInfo
Size() int
Cancel()
}
ExecutableTaskTrackerImpl struct {
Expand Down Expand Up @@ -174,6 +175,13 @@ Loop:
}
}

func (t *ExecutableTaskTrackerImpl) Size() int {
t.Lock()
defer t.Unlock()

return t.taskQueue.Len()
}

func (t *ExecutableTaskTrackerImpl) Cancel() {
t.Lock()
defer t.Unlock()
Expand Down

0 comments on commit d6ab8c5

Please sign in to comment.