Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve tasklist and implement history scavenger for SQL #4059

Merged
merged 27 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fed1274
Fix DB query
kraney Feb 25, 2021
1954871
Apply correct patch
kraney Feb 25, 2021
b16c8fa
A workflow with infinite timeout effectively renders the tasklist sca…
kraney Mar 17, 2021
706e245
Add scavenger for orphaned tasks
kraney Mar 17, 2021
3c386ef
Make scanner settings configurable via dynamicconfig
kraney Mar 17, 2021
02d90db
Implement GetAllHistoryShards for SQL so that the history scanner can…
kraney Mar 17, 2021
6553ccf
Fix tests
kraney Mar 17, 2021
aaf61ef
Address review comments
kraney Mar 19, 2021
c0e9b68
run buildkite test
longquanzheng Mar 19, 2021
abff777
Add requested comments
kraney Mar 19, 2021
af36dcc
Rename dynamic config item for clarity
kraney Mar 19, 2021
94ad34a
Renaming for clarity
kraney Mar 19, 2021
54daab1
Update description
kraney Mar 22, 2021
33c9c31
Shift to config function / default value scheme more like other areas
kraney Mar 22, 2021
90b40cf
Avoid using string minUUID
kraney Mar 22, 2021
abdbc96
Add a test for GetOrphanTasks; fix segfault for testscavengeractivity
kraney Mar 22, 2021
c96abbc
Bring back expiration check
kraney Mar 23, 2021
d832cd5
done
longquanzheng Apr 11, 2021
893624b
fix
longquanzheng Apr 11, 2021
ac0f362
fix terst
longquanzheng Apr 11, 2021
fcd70d7
more fixt
longquanzheng Apr 11, 2021
c3baece
fix matching tests
longquanzheng Apr 11, 2021
7ab960f
fix persistence tests
longquanzheng Apr 11, 2021
9db4e87
fix bug for test
longquanzheng Apr 12, 2021
559ad9c
add more test case
longquanzheng Apr 12, 2021
3a43707
remove debug code
longquanzheng Apr 12, 2021
efcc85d
same fix for postres
longquanzheng Apr 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/config/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

const fileMode = os.FileMode(0644)

// NewZapLogger builds and returns a new
// NewZapLogger builds and returns a new
// Zap logger for this logging configuration
func (cfg *Logger) NewZapLogger() (*zap.Logger, error) {
levelKey := cfg.LevelKey
Expand Down
6 changes: 6 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ const (
DefaultTransactionSizeLimit = 14 * 1024 * 1024
)

const (
DefaultScannerGetOrphanTasksPageSize = 1000
DefaultScannerBatchSizeForCompleteTasksLessThanAckLevel = 16
DefaultScannerMaxTasksProcessedPerTasklistJob = 256
)

const (
// ArchivalEnabled is the status for enabling archival
ArchivalEnabled = "enabled"
Expand Down
15 changes: 15 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ var keys = map[Key]string{
WorkerTimeLimitPerArchivalIteration: "worker.TimeLimitPerArchivalIteration",
WorkerThrottledLogRPS: "worker.throttledLogRPS",
ScannerPersistenceMaxQPS: "worker.scannerPersistenceMaxQPS",
ScannerGetOrphanTasksPageSize: "worker.scannerGetOrphanTasksPageSize",
ScannerBatchSizeForTasklistHandler: "worker.scannerBatchSizeForTasklistHandler",
EnableCleaningOrphanTaskInTasklistScavenger: "worker.enableCleaningOrphanTaskInTasklistScavenger",
ScannerMaxTasksProcessedPerTasklistJob: "worker.scannerMaxTasksProcessedPerTasklistJob",
TaskListScannerEnabled: "worker.taskListScannerEnabled",
HistoryScannerEnabled: "worker.historyScannerEnabled",
ConcreteExecutionsScannerEnabled: "worker.executionsScannerEnabled",
Expand Down Expand Up @@ -828,6 +832,17 @@ const (
WorkerThrottledLogRPS
// ScannerPersistenceMaxQPS is the maximum rate of persistence calls from worker.Scanner
ScannerPersistenceMaxQPS
// ScannerGetOrphanTasksPageSize is the maximum number of orphans to delete in one batch
ScannerGetOrphanTasksPageSize
// ScannerBatchSizeForTasklistHandler is for:
// 1. max number of tasks to query per call(get tasks for tasklist) in the scavenger handler.
// 2. The scavenger then uses the return to decide if a tasklist can be deleted.
// It's better to keep it a relatively high number to let it be more efficient.
ScannerBatchSizeForTasklistHandler
// EnableCleaningOrphanTaskInTasklistScavenger indicates if enabling the scanner to clean up orphan tasks
EnableCleaningOrphanTaskInTasklistScavenger
// ScannerMaxTasksProcessedPerTasklistJob is the number of tasks to process for a tasklist in each workflow run
ScannerMaxTasksProcessedPerTasklistJob
// TaskListScannerEnabled indicates if task list scanner should be started as part of worker.Scanner
TaskListScannerEnabled
// HistoryScannerEnabled indicates if history scanner should be started as part of worker.Scanner
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ const (
PersistenceCompleteTaskScope
// PersistenceCompleteTasksLessThanScope is the metric scope for persistence.TaskManager.PersistenceCompleteTasksLessThan API
PersistenceCompleteTasksLessThanScope
// PersistenceGetOrphanTasksScope is the metric scope for persistence.TaskManager.GetOrphanTasks API
PersistenceGetOrphanTasksScope
// PersistenceLeaseTaskListScope tracks LeaseTaskList calls made by service to persistence layer
PersistenceLeaseTaskListScope
// PersistenceUpdateTaskListScope tracks PersistenceUpdateTaskListScope calls made by service to persistence layer
Expand Down Expand Up @@ -1123,6 +1125,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetTasksScope: {operation: "GetTasks"},
PersistenceCompleteTaskScope: {operation: "CompleteTask"},
PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan"},
PersistenceGetOrphanTasksScope: {operation: "GetOrphanTasks"},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList"},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList"},
PersistenceListTaskListScope: {operation: "ListTaskList"},
Expand Down
25 changes: 24 additions & 1 deletion common/mocks/TaskManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions common/persistence/cassandra/cassandraTaskPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ func newTaskPersistence(
}, nil
}

func (d *cassandraTaskPersistence) GetOrphanTasks(ctx context.Context, request *p.GetOrphanTasksRequest) (*p.GetOrphanTasksResponse, error) {
// TODO: It's unclear if this's necessary or useful for Cassandra
return nil, &types.InternalServiceError{
Message: "Unimplemented call to GetOrphanTasks for Cassandra",
}
}

// From TaskManager interface
func (d *cassandraTaskPersistence) LeaseTaskList(
ctx context.Context,
Expand Down
19 changes: 19 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,14 @@ type (
CreatedTime time.Time
}

// TaskKey gives primary key info for a specific task
TaskKey struct {
DomainID string
TaskListName string
TaskType int
TaskID int64
}

// Task is the generic interface for workflow tasks
Task interface {
GetType() int
Expand Down Expand Up @@ -1167,6 +1175,16 @@ type (
Limit int // Limit on the max number of tasks that can be completed. Required param
}

// GetOrphanTasksRequest contains the request params need to invoke the GetOrphanTasks API
GetOrphanTasksRequest struct {
Limit int
}

// GetOrphanTasksResponse is the response to GetOrphanTasksRequests
GetOrphanTasksResponse struct {
Tasks []*TaskKey
}

// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
Expand Down Expand Up @@ -1587,6 +1605,7 @@ type (
GetTasks(ctx context.Context, request *GetTasksRequest) (*GetTasksResponse, error)
CompleteTask(ctx context.Context, request *CompleteTaskRequest) error
CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (int, error)
GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error)
}

// HistoryManager is used to manager workflow history events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ func (s *HistoryV2PersistenceSuite) TestScanAllTrees() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

// TODO https://github.com/uber/cadence/issues/2458
if s.HistoryV2Mgr.GetName() != "cassandra" {
return
}

resp, err := s.HistoryV2Mgr.GetAllHistoryTreeBranches(ctx, &p.GetAllHistoryTreeBranchesRequest{
PageSize: 1,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (s *MatchingPersistenceSuite) TestListWithMultipleTaskList() {
listedNames := make(map[string]struct{})
var nextPageToken []byte
for {
resp, err := s.TaskMgr.ListTaskList(ctx, &p.ListTaskListRequest{PageSize: 10, PageToken: nextPageToken})
resp, err := s.TaskMgr.ListTaskList(ctx, &p.ListTaskListRequest{PageSize: 1, PageToken: nextPageToken})
s.NoError(err)
for _, it := range resp.Items {
s.Equal(domainID, it.DomainID)
Expand All @@ -530,9 +530,98 @@ func (s *MatchingPersistenceSuite) TestListWithMultipleTaskList() {
}
s.Equal(tlNames, listedNames, "list API returned wrong set of task list names")
}

// final test again pagination
total := 0
var nextPageToken []byte
for {
resp, err := s.TaskMgr.ListTaskList(ctx, &p.ListTaskListRequest{
PageSize: 6,
PageToken: nextPageToken,
})
s.NoError(err)
total += len(resp.Items)
if resp.NextPageToken == nil {
break
}
nextPageToken = resp.NextPageToken
}
s.Equal(10, total)

s.deleteAllTaskList()
resp, err := s.TaskMgr.ListTaskList(ctx, &p.ListTaskListRequest{PageSize: 10})
s.NoError(err)
s.Nil(resp.NextPageToken)
s.Equal(0, len(resp.Items))
}

func (s *MatchingPersistenceSuite) TestGetOrphanTasks() {
if s.TaskMgr.GetName() == "cassandra" {
// GetOrphanTasks API is currently not supported in cassandra"
return
}
s.deleteAllTaskList()

ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

oresp, err := s.TaskMgr.GetOrphanTasks(ctx, &p.GetOrphanTasksRequest{Limit: 10})
s.NoError(err)
// existing orphans that caused by other tests
existingOrphans := len(oresp.Tasks)

domainID := uuid.New()
name := fmt.Sprintf("test-list-with-orphans")
resp, err := s.TaskMgr.LeaseTaskList(ctx, &p.LeaseTaskListRequest{
DomainID: domainID,
TaskList: name,
TaskType: p.TaskListTypeActivity,
TaskListKind: p.TaskListKindNormal,
})
s.NoError(err)

wid := uuid.New()
rid := uuid.New()
s.TaskMgr.CreateTasks(ctx, &p.CreateTasksRequest{
TaskListInfo: resp.TaskListInfo,
Tasks: []*p.CreateTaskInfo{
{
Execution: types.WorkflowExecution{WorkflowID: wid, RunID: rid},
Data: &p.TaskInfo{
DomainID: domainID,
WorkflowID: wid,
RunID: rid,
TaskID: 0,
ScheduleID: 0,
ScheduleToStartTimeout: 0,
Expiry: time.Now(),
CreatedTime: time.Now(),
},
TaskID: 0,
},
},
})

oresp, err = s.TaskMgr.GetOrphanTasks(ctx, &p.GetOrphanTasksRequest{Limit: 10})
s.NoError(err)

s.Equal(existingOrphans, len(oresp.Tasks))

s.deleteAllTaskList()

oresp, err = s.TaskMgr.GetOrphanTasks(ctx, &p.GetOrphanTasksRequest{Limit: 10})
s.NoError(err)

s.Equal(existingOrphans+1, len(oresp.Tasks))
found := false
for _, it := range oresp.Tasks {
if it.DomainID != domainID {
continue
}
s.Equal(p.TaskListTypeActivity, it.TaskType)
s.Equal(int64(0), it.TaskID)
s.Equal(name, it.TaskListName)
found = true
}
s.True(found)
}
25 changes: 25 additions & 0 deletions common/persistence/persistenceErrorInjectionClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,31 @@ func (p *taskErrorInjectionPersistenceClient) CompleteTasksLessThan(
return response, persistenceErr
}

func (p *taskErrorInjectionPersistenceClient) GetOrphanTasks(
ctx context.Context,
request *GetOrphanTasksRequest,
) (*GetOrphanTasksResponse, error) {
fakeErr := generateFakeError(p.errorRate)

var response *GetOrphanTasksResponse
var persistenceErr error
var forwardCall bool
if forwardCall = shouldForwardCallToPersistence(fakeErr); forwardCall {
response, persistenceErr = p.persistence.GetOrphanTasks(ctx, request)
}

if fakeErr != nil {
p.logger.Error(msgInjectedFakeErr,
tag.StoreOperationCompleteTask,
tag.Error(fakeErr),
tag.Bool(forwardCall),
tag.StoreError(persistenceErr),
)
return nil, fakeErr
}
return response, persistenceErr
}

func (p *taskErrorInjectionPersistenceClient) LeaseTaskList(
ctx context.Context,
request *LeaseTaskListRequest,
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type (
// - number of rows actually deleted, if limit is honored
// - UnknownNumRowsDeleted, when all rows below value are deleted
CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (int, error)
// GetOrphanTasks returns tasks that exist as records in the database but are part of task lists which
// _do not_ exist in the database. They are therefore unreachable and no longer represent valid items
// that can be legitimately acted upon.
GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error)
longquanzheng marked this conversation as resolved.
Show resolved Hide resolved
kraney marked this conversation as resolved.
Show resolved Hide resolved
}

// MetadataStore is a lower level of MetadataManager
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,17 @@ func (p *taskPersistenceClient) CompleteTasksLessThan(
return result, err
}

func (p *taskPersistenceClient) GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetOrphanTasksScope, metrics.PersistenceRequests)
sw := p.metricClient.StartTimer(metrics.PersistenceGetOrphanTasksScope, metrics.PersistenceLatency)
result, err := p.persistence.GetOrphanTasks(ctx, request)
sw.Stop()
if err != nil {
p.updateErrorMetric(metrics.PersistenceGetOrphanTasksScope, err)
}
return result, err
}

func (p *taskPersistenceClient) LeaseTaskList(
ctx context.Context,
request *LeaseTaskListRequest,
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ func (p *taskRateLimitedPersistenceClient) CompleteTasksLessThan(
return p.persistence.CompleteTasksLessThan(ctx, request)
}

func (p *taskRateLimitedPersistenceClient) GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}
return p.persistence.GetOrphanTasks(ctx, request)
}

func (p *taskRateLimitedPersistenceClient) LeaseTaskList(
ctx context.Context,
request *LeaseTaskListRequest,
Expand Down
Loading