Skip to content

Commit

Permalink
Worker versioning leftovers (#4400)
Browse files Browse the repository at this point in the history
Note: This commit came from a feature branch and is not expected to build.
  • Loading branch information
bergundy authored and dnr committed May 26, 2023
1 parent 8a325f8 commit 209e250
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 14 deletions.
8 changes: 4 additions & 4 deletions common/persistence/sql/sqlplugin/matching_task_queue.go
Expand Up @@ -64,13 +64,13 @@ type (
DataEncoding string
}

AddBuildIdToTaskQueueMapping struct {
AddBuildIdsToTaskQueueMapping struct {
NamespaceID []byte
TaskQueueName string
BuildIds []string
}

RemoveBuildIdToTaskQueueMapping struct {
RemoveBuildIdsToTaskQueueMapping struct {
NamespaceID []byte
TaskQueueName string
BuildIds []string
Expand Down Expand Up @@ -114,8 +114,8 @@ type (
LockTaskQueues(ctx context.Context, filter TaskQueuesFilter) (int64, error)
GetTaskQueueUserData(ctx context.Context, request *GetTaskQueueUserDataRequest) (*VersionedBlob, error)
UpdateTaskQueueUserData(ctx context.Context, request *UpdateTaskQueueDataRequest) error
AddBuildIdToTaskQueueMapping(ctx context.Context, request AddBuildIdToTaskQueueMapping) error
RemoveBuildIdToTaskQueueMapping(ctx context.Context, request RemoveBuildIdToTaskQueueMapping) error
AddBuildIdToTaskQueueMapping(ctx context.Context, request AddBuildIdsToTaskQueueMapping) error
RemoveBuildIdToTaskQueueMapping(ctx context.Context, request RemoveBuildIdsToTaskQueueMapping) error
ListTaskQueueUserDataEntries(ctx context.Context, request *ListTaskQueueUserDataEntriesRequest) ([]TaskQueueUserDataEntry, error)
GetTaskQueuesByBuildId(ctx context.Context, request *GetTaskQueuesByBuildIdRequest) ([]string, error)
CountTaskQueuesByBuildId(ctx context.Context, request *CountTaskQueuesByBuildIdRequest) (int, error)
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/sqlplugin/mysql/task.go
Expand Up @@ -340,7 +340,7 @@ func (mdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdToTaskQueueMapping) error {
func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -356,7 +356,7 @@ func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdToTaskQueueMapping) error {
func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
// TODO(bergundy): implement when we support deletion
panic("not implemented")
}
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/sqlplugin/postgresql/task.go
Expand Up @@ -340,7 +340,7 @@ func (pdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdToTaskQueueMapping) error {
func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -355,7 +355,7 @@ func (pdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (pdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdToTaskQueueMapping) error {
func (pdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
// TODO(bergundy): implement when we support deletion
panic("not implemented")
}
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/sqlite/task.go
Expand Up @@ -346,7 +346,7 @@ func (mdb *db) UpdateTaskQueueUserData(ctx context.Context, request *sqlplugin.U
return nil
}

func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdToTaskQueueMapping) error {
func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.AddBuildIdsToTaskQueueMapping) error {
query := addBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -362,7 +362,7 @@ func (mdb *db) AddBuildIdToTaskQueueMapping(ctx context.Context, request sqlplug
return err
}

func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdToTaskQueueMapping) error {
func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlplugin.RemoveBuildIdsToTaskQueueMapping) error {
query := removeBuildIdToTaskQueueMappingQry
var params []any
for idx, buildId := range request.BuildIds {
Expand All @@ -376,7 +376,7 @@ func (mdb *db) RemoveBuildIdToTaskQueueMapping(ctx context.Context, request sqlp
_, err := mdb.conn.ExecContext(ctx, query, params...)
if err == nil {
// TODO(bergundy)
panic("this should be properly tested once we support deletion")
panic("Build id removal should be properly tested once we support deletion")
}
return err
}
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/task.go
Expand Up @@ -518,7 +518,7 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
return err
}
if len(request.BuildIdsAdded) > 0 {
err = tx.AddBuildIdToTaskQueueMapping(ctx, sqlplugin.AddBuildIdToTaskQueueMapping{
err = tx.AddBuildIdToTaskQueueMapping(ctx, sqlplugin.AddBuildIdsToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsAdded,
Expand All @@ -528,7 +528,7 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
}
}
if len(request.BuildIdsRemoved) > 0 {
err = tx.RemoveBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveBuildIdToTaskQueueMapping{
err = tx.RemoveBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveBuildIdsToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsRemoved,
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflow_handler.go
Expand Up @@ -2775,6 +2775,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows
UpsertMemo: true,
EagerWorkflowStart: true,
SdkMetadata: true,
BuildIdBasedVersioning: true,
},
}, nil
}
Expand Down
5 changes: 4 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Expand Up @@ -2104,13 +2104,14 @@ func (ms *MutableStateImpl) trackBuildIdFromCompletion(
return err
}
anyAdded := false
added := false
if !version.GetUseVersioning() {
var added bool
// Make sure unversioned workflow tasks are easily locatable with just the prefix
buildIds, added = ms.addBuildIdToLoadedSearchAttribute(buildIds, common.UnversionedSearchAttribute, limits.MaxTrackedBuildIds)
anyAdded = anyAdded || added
}
if version.GetBuildId() != "" {
var added bool
ms.addResetPointFromCompletion(version.GetBuildId(), eventID, limits.MaxResetPoints)
buildIds, added = ms.addBuildIdToLoadedSearchAttribute(buildIds, common.VersionStampToBuildIdSearchAttribute(version), limits.MaxTrackedBuildIds)
anyAdded = anyAdded || added
Expand Down Expand Up @@ -2145,6 +2146,8 @@ func (ms *MutableStateImpl) loadBuildIds() ([]string, error) {
}
}

// Takes a list of loaded build IDs from a search attribute and added a new build ID to it while respecting provided limits.
// Returns a potentially modified list and a flag indicating whether it was modified.
func (ms *MutableStateImpl) addBuildIdToLoadedSearchAttribute(searchAttributeValues []string, searchAttributeValue string, maxTrackedBuildIds int) ([]string, bool) {
if maxTrackedBuildIds < 1 {
// Can't track this build ID
Expand Down
1 change: 1 addition & 0 deletions service/matching/matchingEngine.go
Expand Up @@ -1008,6 +1008,7 @@ func (e *matchingEngineImpl) ReplicateTaskQueueUserData(ctx context.Context, req
if e.namespaceReplicationQueue == nil {
return &matchingservice.ReplicateTaskQueueUserDataResponse{}, nil
}

locks := e.getNamespaceUpdateLocks(request.GetNamespaceId())
locks.replicationLock.Lock()
defer locks.replicationLock.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions service/matching/taskQueueManager.go
Expand Up @@ -498,6 +498,16 @@ func (c *taskQueueManagerImpl) UpdateUserData(ctx context.Context, options UserD
if !options.Replicate {
return nil
}

// Only replicate if namespace is global and has at least 2 clusters registered.
ns, err := c.namespaceRegistry.GetNamespaceByID(c.db.namespaceID)
if err != nil {
return err
}
if !ns.IsGlobalNamespace() || len(ns.ClusterNames()) < 2 {
return nil
}

_, err = c.matchingClient.ReplicateTaskQueueUserData(ctx, &matchingservice.ReplicateTaskQueueUserDataRequest{
NamespaceId: c.db.namespaceID.String(),
TaskQueue: c.taskQueueID.BaseNameString(),
Expand Down

0 comments on commit 209e250

Please sign in to comment.