Skip to content

Commit

Permalink
Properly check report pollers and report reachability flags (#5982)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Properly check report pollers and report reachability flags in enhanced
DescribeTaskQueue.

## Why?
<!-- Tell your future self why have you made these changes -->
This is a bug.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Existing tests.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
none.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
none.
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
yes.
  • Loading branch information
ShahabT committed May 24, 2024
1 parent 09cb808 commit c5cb979
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 20 deletions.
6 changes: 3 additions & 3 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ func VisibilityPluginNameTag(value string) Tag {
return &tagImpl{key: visibilityPluginNameTagName, value: value}
}

// VersionedTag represents whether a loaded task queue manager represents a specific version set.
func VersionedTag(versioned bool) Tag {
return &tagImpl{key: versionedTagName, value: strconv.FormatBool(versioned)}
// VersionedTag represents whether a loaded task queue manager represents a specific version set or build ID or not.
func VersionedTag(versioned string) Tag {
return &tagImpl{key: versionedTagName, value: versioned}
}

func ServiceErrorTypeTag(err error) Tag {
Expand Down
50 changes: 33 additions & 17 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
},
Versions: req.GetVersions(),
ReportBacklogInfo: false,
ReportPollers: req.GetReportTaskReachability(),
ReportPollers: req.GetReportPollers(),
})
if err != nil {
return nil, err
Expand All @@ -954,7 +954,7 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
physicalInfoByBuildId[buildId][taskQueueType] = vii.PhysicalTaskQueueInfo
} else {
merged := &taskqueuespb.PhysicalTaskQueueInfo{
Pollers: append(physInfo.GetPollers(), vii.PhysicalTaskQueueInfo.GetPollers()...),
Pollers: dedupPollers(append(physInfo.GetPollers(), vii.PhysicalTaskQueueInfo.GetPollers()...)),
}
physicalInfoByBuildId[buildId][taskQueueType] = merged
}
Expand All @@ -970,21 +970,24 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
Pollers: physicalInfo.Pollers,
}
}
reachability, err := getBuildIdTaskReachability(ctx,
newReachabilityCalculator(
userData.GetVersioningData(),
e.reachabilityCache,
request.GetNamespaceId(),
req.GetNamespace(),
req.GetTaskQueue().GetName(),
e.config.ReachabilityBuildIdVisibilityGracePeriod(req.GetNamespace()),
),
e.metricsHandler,
e.logger,
bid,
)
if err != nil {
return nil, err
var reachability enumspb.BuildIdTaskReachability
if req.GetReportTaskReachability() {
reachability, err = getBuildIdTaskReachability(ctx,
newReachabilityCalculator(
userData.GetVersioningData(),
e.reachabilityCache,
request.GetNamespaceId(),
req.GetNamespace(),
req.GetTaskQueue().GetName(),
e.config.ReachabilityBuildIdVisibilityGracePeriod(req.GetNamespace()),
),
e.metricsHandler,
e.logger,
bid,
)
if err != nil {
return nil, err
}
}
versionsInfo[bid] = &taskqueuepb.TaskQueueVersionInfo{
TypesInfo: typesInfo,
Expand All @@ -1009,6 +1012,18 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
return pm.LegacyDescribeTaskQueue(req.GetIncludeTaskQueueStatus()), nil
}

func dedupPollers(pollerInfos []*taskqueuepb.PollerInfo) []*taskqueuepb.PollerInfo {
allKeys := make(map[string]bool)
var list []*taskqueuepb.PollerInfo
for _, item := range pollerInfos {
if _, value := allKeys[item.GetIdentity()]; !value {
allKeys[item.GetIdentity()] = true
list = append(list, item)
}
}
return list
}

func (e *matchingEngineImpl) DescribeTaskQueuePartition(
ctx context.Context,
request *matchingservice.DescribeTaskQueuePartitionRequest,
Expand Down Expand Up @@ -1915,6 +1930,7 @@ func (e *matchingEngineImpl) updatePhysicalTaskQueueGauge(pm *physicalTaskQueueM
metrics.NamespaceTag(pmImpl.ns.Name().String()),
metrics.TaskTypeTag(physicalTaskQueueParameters.taskType.String()),
metrics.PartitionTypeTag(physicalTaskQueueParameters.partitionType.String()),
metrics.VersionedTag(versioned),
)
}

Expand Down
81 changes: 81 additions & 0 deletions tests/versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -4106,7 +4106,88 @@ func (s *VersioningIntegSuite) TestDescribeTaskQueueEnhanced_Unversioned() {

return foundN == workerN
}, 3*time.Second, 50*time.Millisecond)
}

func (s *VersioningIntegSuite) TestDescribeTaskQueueEnhanced_ReportFlags() {
tq := s.randomizeStr(s.T().Name())
wf := func(ctx workflow.Context) (string, error) { return "ok", nil }
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

wId := s.randomizeStr("id")
w := worker.New(s.sdkClient, tq, worker.Options{
UseBuildIDForVersioning: false,
Identity: wId,
})
w.RegisterWorkflow(wf)
s.NoError(w.Start())
defer w.Stop()

// wait for pollers to show up, verify both ReportPollers and ReportTaskReachability
s.Eventually(func() bool {
resp, err := s.engine.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tq, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
Versions: nil, // default version, in this case unversioned queue
TaskQueueTypes: nil, // both types
ReportPollers: true,
ReportTaskReachability: true,
})
s.NoError(err)
s.NotNil(resp)
s.Assert().Equal(1, len(resp.GetVersionsInfo()), "should be 1 because only default/unversioned queue")
versionInfo := resp.GetVersionsInfo()[""]
s.Assert().Equal(enumspb.BUILD_ID_TASK_REACHABILITY_REACHABLE, versionInfo.GetTaskReachability())
var pollersInfo []*taskqueuepb.PollerInfo
for _, t := range versionInfo.GetTypesInfo() {
pollersInfo = append(pollersInfo, t.GetPollers()...)
}
for _, pi := range pollersInfo {
s.False(pi.GetWorkerVersionCapabilities().GetUseVersioning())
if pi.GetIdentity() == wId {
return true
}
}

return false
}, 3*time.Second, 50*time.Millisecond)

// ask for reachability only
resp, err := s.engine.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tq, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
Versions: nil, // default version, in this case unversioned queue
TaskQueueTypes: nil, // both types
ReportTaskReachability: true,
})
s.NoError(err)
s.NotNil(resp)
s.Assert().Equal(1, len(resp.GetVersionsInfo()), "should be 1 because only default/unversioned queue")
versionInfo := resp.GetVersionsInfo()[""]
s.Assert().Equal(enumspb.BUILD_ID_TASK_REACHABILITY_REACHABLE, versionInfo.GetTaskReachability())
for _, t := range versionInfo.GetTypesInfo() {
s.Zero(len(t.GetPollers()), "poller info should not be reported")
}

// ask for pollers only
resp, err = s.engine.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tq, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
Versions: nil, // default version, in this case unversioned queue
TaskQueueTypes: nil, // both types
ReportPollers: true,
})
s.NoError(err)
s.NotNil(resp)
s.Assert().Equal(1, len(resp.GetVersionsInfo()), "should be 1 because only default/unversioned queue")
versionInfo = resp.GetVersionsInfo()[""]
s.Assert().Equal(enumspb.BUILD_ID_TASK_REACHABILITY_UNSPECIFIED, versionInfo.GetTaskReachability())
for _, t := range versionInfo.GetTypesInfo() {
s.Equal(1, len(t.GetPollers()), "only one poller info should be reported")
}
}

func (s *VersioningIntegSuite) TestDescribeTaskQueueEnhanced_TooManyBuildIds() {
Expand Down

0 comments on commit c5cb979

Please sign in to comment.