Skip to content

Commit

Permalink
Merge branch 'master' into alfred/shard-valid
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred-landrum committed Jul 28, 2023
2 parents 6b5b6f5 + 9269507 commit 6d711a6
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 199 deletions.
403 changes: 258 additions & 145 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions api/matchingservice/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions common/metrics/metric_defs.go
Expand Up @@ -40,6 +40,7 @@ const (
visibilityTypeTagName = "visibility_type"
ErrorTypeTagName = "error_type"
httpStatusTagName = "http_status"
versionedTagName = "versioned"
resourceExhaustedTag = "resource_exhausted_cause"
standardVisibilityTagValue = "standard_visibility"
advancedVisibilityTagValue = "advanced_visibility"
Expand Down Expand Up @@ -1461,6 +1462,8 @@ var (
TaskWriteLatencyPerTaskQueue = NewTimerDef("task_write_latency")
TaskLagPerTaskQueueGauge = NewGaugeDef("task_lag_per_tl")
NoRecentPollerTasksPerTaskQueueCounter = NewCounterDef("no_poller_tasks")
UnknownBuildPollsCounter = NewCounterDef("unknown_build_polls")
UnknownBuildTasksCounter = NewCounterDef("unknown_build_tasks")

// Worker
ExecutorTasksDoneCount = NewCounterDef("executor_done")
Expand Down
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Expand Up @@ -250,6 +250,11 @@ func VisibilityTypeTag(value string) Tag {
return &tagImpl{key: visibilityTypeTagName, value: value}
}

// VersionedTag represents whether a loaded task queue manager represents a specific version set.
func VersionedTag(versioned bool) Tag {
return &tagImpl{key: versionedTagName, value: strconv.FormatBool(versioned)}
}

func ServiceErrorTypeTag(err error) Tag {
return &tagImpl{key: ErrorTypeTagName, value: strings.TrimPrefix(fmt.Sprintf(getType, err), errorPrefix)}
}
Expand Down
Expand Up @@ -219,6 +219,7 @@ message UpdateWorkerBuildIdCompatibilityRequest {
oneof operation {
ApplyPublicRequest apply_public_request = 3;
RemoveBuildIds remove_build_ids = 4;
string persist_unknown_build_id = 5;
}
}
message UpdateWorkerBuildIdCompatibilityResponse {}
Expand Down
Expand Up @@ -79,9 +79,9 @@ service MatchingService {
}

// (-- api-linter: core::0134::response-message-name=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrdering RPC doesn't follow Google API format. --)
// aip.dev/not-precedent: UpdateWorkerBuildIdCompatibility RPC doesn't follow Google API format. --)
// (-- api-linter: core::0134::method-signature=disabled
// aip.dev/not-precedent: UpdateWorkerBuildIdOrdering RPC doesn't follow Google API format. --)
// aip.dev/not-precedent: UpdateWorkerBuildIdCompatibility RPC doesn't follow Google API format. --)
rpc UpdateWorkerBuildIdCompatibility (UpdateWorkerBuildIdCompatibilityRequest) returns (UpdateWorkerBuildIdCompatibilityResponse) {}
rpc GetWorkerBuildIdCompatibility (GetWorkerBuildIdCompatibilityRequest) returns (GetWorkerBuildIdCompatibilityResponse) {}
// Fetch user data for a task queue, this request should always be routed to the node holding the root partition of the workflow task queue.
Expand Down
50 changes: 33 additions & 17 deletions service/matching/matching_engine.go
Expand Up @@ -96,6 +96,7 @@ type (
namespaceID namespace.ID
taskType enumspb.TaskQueueType
kind enumspb.TaskQueueKind
versioned bool
}

pollMetadata struct {
Expand Down Expand Up @@ -125,6 +126,7 @@ type (
metricsHandler metrics.Handler
taskQueuesLock sync.RWMutex // locks mutation of taskQueues
taskQueues map[taskQueueID]taskQueueManager
taskQueueCountLock sync.Mutex
taskQueueCount map[taskQueueCounterKey]int // per-namespace task queue counter
config *Config
lockableQueryTaskMap lockableQueryTaskMap
Expand Down Expand Up @@ -275,25 +277,22 @@ func (e *matchingEngineImpl) getTaskQueueManager(

// If it gets here, write lock and check again in case a task queue is created between the two locks
e.taskQueuesLock.Lock()
if tqm, ok = e.taskQueues[*taskQueue]; !ok {
tqm, ok = e.taskQueues[*taskQueue]
if !ok {
var err error
tqm, err = newTaskQueueManager(e, taskQueue, stickyInfo, e.config, e.clusterMeta)
if err != nil {
e.taskQueuesLock.Unlock()
return nil, err
}
tqm.Start()
e.taskQueues[*taskQueue] = tqm
countKey := taskQueueCounterKey{
namespaceID: taskQueue.namespaceID,
taskType: taskQueue.taskType,
kind: stickyInfo.kind,
}
e.taskQueueCount[countKey]++
taskQueueCount := e.taskQueueCount[countKey]
e.updateTaskQueueGauge(countKey, taskQueueCount)
}
e.taskQueuesLock.Unlock()

if !ok {
tqm.Start()
e.updateTaskQueueGauge(tqm, 1)
}
}

if err := tqm.WaitUntilInitialized(ctx); err != nil {
Expand Down Expand Up @@ -909,6 +908,12 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
// We don't need to keep the tombstones around if we're not replicating them.
versioningData = ClearTombstones(versioningData)
}
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_PersistUnknownBuildId:
versioningData = PersistUnknownBuildId(
updatedClock,
data.GetVersioningData(),
req.GetPersistUnknownBuildId(),
)
default:
return nil, false, serviceerror.NewInvalidArgument(fmt.Sprintf("invalid operation: %v", req.GetOperation()))
}
Expand Down Expand Up @@ -1253,27 +1258,38 @@ func (e *matchingEngineImpl) unloadTaskQueue(unloadTQM taskQueueManager) {
return
}
delete(e.taskQueues, *queueID)
countKey := taskQueueCounterKey{namespaceID: queueID.namespaceID, taskType: queueID.taskType, kind: foundTQM.TaskQueueKind()}
e.taskQueueCount[countKey]--
taskQueueCount := e.taskQueueCount[countKey]
e.taskQueuesLock.Unlock()

e.updateTaskQueueGauge(countKey, taskQueueCount)
// This may call unloadTaskQueue again but that's okay, the next call will not find it.
foundTQM.Stop()
e.updateTaskQueueGauge(foundTQM, -1)
}

func (e *matchingEngineImpl) updateTaskQueueGauge(countKey taskQueueCounterKey, taskQueueCount int) {
func (e *matchingEngineImpl) updateTaskQueueGauge(tqm taskQueueManager, delta int) {
id := tqm.QueueID()
countKey := taskQueueCounterKey{
namespaceID: id.namespaceID,
taskType: id.taskType,
kind: tqm.TaskQueueKind(),
versioned: id.VersionSet() != "",
}

e.taskQueueCountLock.Lock()
e.taskQueueCount[countKey] += delta
newCount := e.taskQueueCount[countKey]
e.taskQueueCountLock.Unlock()

nsEntry, err := e.namespaceRegistry.GetNamespaceByID(countKey.namespaceID)
ns := namespace.Name("unknown")
if err == nil {
ns = nsEntry.Name()
}

e.metricsHandler.Gauge(metrics.LoadedTaskQueueGauge.GetMetricName()).Record(
float64(taskQueueCount),
float64(newCount),
metrics.NamespaceTag(ns.String()),
metrics.TaskTypeTag(countKey.taskType.String()),
metrics.QueueTypeTag(countKey.kind.String()),
metrics.VersionedTag(countKey.versioned),
)
}

Expand Down
117 changes: 117 additions & 0 deletions service/matching/matching_engine_test.go
Expand Up @@ -2323,6 +2323,123 @@ func (s *matchingEngineSuite) TestAddActivityTask_ForVersionedWorkflows_Silently
s.Require().NoError(err)
}

func (s *matchingEngineSuite) TestUnknownBuildId_Poll() {
namespaceId := namespace.ID(uuid.New())
tl := "makeToast"
tlID := newTestTaskQueueID(namespaceId, tl, enumspb.TASK_QUEUE_TYPE_WORKFLOW)

scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsHandler = metrics.NewTallyMetricsHandler(metrics.ClientConfig{}, scope)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

_, err := s.matchingEngine.getTask(ctx, tlID, normalStickyInfo, &pollMetadata{
workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "unknown",
UseVersioning: true,
},
})
s.Error(err) // deadline exceeded or canceled

unknownCtr := scope.Snapshot().Counters()["test.unknown_build_polls+namespace="+matchingTestNamespace+",operation=TaskQueueMgr,task_type=Workflow,taskqueue=makeToast"]
s.Equal(int64(1), unknownCtr.Value())
}

func (s *matchingEngineSuite) TestUnknownBuildId_Add() {
namespaceId := namespace.ID(uuid.New())
tl := "makeToast"

scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsHandler = metrics.NewTallyMetricsHandler(metrics.ClientConfig{}, scope)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

s.mockMatchingClient.EXPECT().UpdateWorkerBuildIdCompatibility(gomock.Any(), &matchingservice.UpdateWorkerBuildIdCompatibilityRequest{
NamespaceId: namespaceId.String(),
TaskQueue: tl,
Operation: &matchingservice.UpdateWorkerBuildIdCompatibilityRequest_PersistUnknownBuildId{
PersistUnknownBuildId: "unknown",
},
}).Return(&matchingservice.UpdateWorkerBuildIdCompatibilityResponse{}, nil)

_, err := s.matchingEngine.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{
NamespaceId: namespaceId.String(),
Execution: &commonpb.WorkflowExecution{RunId: "run", WorkflowId: "wf"},
ScheduledEventId: 0,
TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
ForwardedSource: "somewhere", // force sync match only
VersionDirective: &taskqueue.TaskVersionDirective{
Value: &taskqueue.TaskVersionDirective_BuildId{
BuildId: "unknown",
},
},
})
s.ErrorIs(err, errRemoteSyncMatchFailed)

unknownCtr := scope.Snapshot().Counters()["test.unknown_build_tasks+namespace="+matchingTestNamespace+",operation=TaskQueueMgr,task_type=Workflow,taskqueue=makeToast"]
s.Equal(int64(1), unknownCtr.Value())
}

func (s *matchingEngineSuite) TestUnknownBuildId_Match() {
namespaceId := namespace.ID(uuid.New())
tl := "makeToast"

scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsHandler = metrics.NewTallyMetricsHandler(metrics.ClientConfig{}, scope)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

s.mockMatchingClient.EXPECT().UpdateWorkerBuildIdCompatibility(gomock.Any(), &matchingservice.UpdateWorkerBuildIdCompatibilityRequest{
NamespaceId: namespaceId.String(),
TaskQueue: tl,
Operation: &matchingservice.UpdateWorkerBuildIdCompatibilityRequest_PersistUnknownBuildId{
PersistUnknownBuildId: "unknown",
},
}).Return(&matchingservice.UpdateWorkerBuildIdCompatibilityResponse{}, nil).AnyTimes() // might get called again on dispatch from spooled

var wg sync.WaitGroup
wg.Add(2)

go func() {
_, err := s.matchingEngine.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{
NamespaceId: namespaceId.String(),
Execution: &commonpb.WorkflowExecution{RunId: "run", WorkflowId: "wf"},
ScheduledEventId: 123,
TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
// do not set ForwardedSource, allow to go to db
VersionDirective: &taskqueue.TaskVersionDirective{
Value: &taskqueue.TaskVersionDirective_BuildId{
BuildId: "unknown",
},
},
})
s.NoError(err)
wg.Done()
}()

go func() {
tlID := newTestTaskQueueID(namespaceId, tl, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
task, err := s.matchingEngine.getTask(ctx, tlID, normalStickyInfo, &pollMetadata{
workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "unknown",
UseVersioning: true,
},
})
s.NoError(err)
s.Equal("wf", task.event.Data.WorkflowId)
s.Equal(int64(123), task.event.Data.ScheduledEventId)
task.finish(nil)
wg.Done()
}()

wg.Wait()
}

func (s *matchingEngineSuite) setupRecordActivityTaskStartedMock(tlName string) {
activityTypeName := "activity1"
activityID := "activityId1"
Expand Down
53 changes: 47 additions & 6 deletions service/matching/task_queue_manager.go
Expand Up @@ -779,14 +779,23 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForPoll(caps *commonpb.Wo

if c.kind == enumspb.TASK_QUEUE_KIND_STICKY {
// In the sticky case we don't redirect, but we may kick off this worker if there's a newer one.
err := checkVersionForStickyPoll(data, caps)
return c.taskQueueID, err
unknownBuild, err := checkVersionForStickyPoll(data, caps)
if err != nil {
return nil, err
}
if unknownBuild {
c.recordUnknownBuildPoll(caps.BuildId)
}
return c.taskQueueID, nil
}

versionSet, err := lookupVersionSetForPoll(data, caps)
versionSet, unknownBuild, err := lookupVersionSetForPoll(data, caps)
if err != nil {
return nil, err
}
if unknownBuild {
c.recordUnknownBuildPoll(caps.BuildId)
}
return newTaskQueueIDWithVersionSet(c.taskQueueID, versionSet), nil
}

Expand Down Expand Up @@ -815,20 +824,52 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Contex

if c.kind == enumspb.TASK_QUEUE_KIND_STICKY {
// In the sticky case we don't redirect, but we may kick off this worker if there's a newer one.
err := checkVersionForStickyAdd(data, buildId)
return c.taskQueueID, userDataChanged, err
unknownBuild, err := checkVersionForStickyAdd(data, buildId)
if err != nil {
return nil, nil, err
}
if unknownBuild {
c.recordUnknownBuildTask(buildId)
// Don't bother persisting the unknown build id in this case: sticky tasks have a
// short timeout, so it doesn't matter if they get lost.
}
return c.taskQueueID, userDataChanged, nil
}

versionSet, err := lookupVersionSetForAdd(data, buildId)
versionSet, unknownBuild, err := lookupVersionSetForAdd(data, buildId)
if err == errEmptyVersioningData { // nolint:goerr113
// default was requested for an unversioned queue
return c.taskQueueID, userDataChanged, nil
} else if err != nil {
return nil, nil, err
}
if unknownBuild {
c.recordUnknownBuildTask(buildId)
// Send rpc to root partition to persist the unknown build id before we return success.
_, err = c.matchingClient.UpdateWorkerBuildIdCompatibility(ctx, &matchingservice.UpdateWorkerBuildIdCompatibilityRequest{
NamespaceId: c.taskQueueID.namespaceID.String(),
TaskQueue: c.taskQueueID.Root().FullName(),
Operation: &matchingservice.UpdateWorkerBuildIdCompatibilityRequest_PersistUnknownBuildId{
PersistUnknownBuildId: buildId,
},
})
if err != nil {
return nil, nil, err
}
}
return newTaskQueueIDWithVersionSet(c.taskQueueID, versionSet), userDataChanged, nil
}

func (c *taskQueueManagerImpl) recordUnknownBuildPoll(buildId string) {
c.logger.Warn("unknown build id in poll", tag.BuildId(buildId))
c.taggedMetricsHandler.Counter(metrics.UnknownBuildPollsCounter.GetMetricName()).Record(1)
}

func (c *taskQueueManagerImpl) recordUnknownBuildTask(buildId string) {
c.logger.Warn("unknown build id in task", tag.BuildId(buildId))
c.taggedMetricsHandler.Counter(metrics.UnknownBuildTasksCounter.GetMetricName()).Record(1)
}

func (c *taskQueueManagerImpl) callerInfoContext(ctx context.Context) context.Context {
ns, _ := c.namespaceRegistry.GetNamespaceName(c.taskQueueID.namespaceID)
return headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(ns.String()))
Expand Down

0 comments on commit 6d711a6

Please sign in to comment.