Skip to content

Commit

Permalink
Bring back expiration check
Browse files Browse the repository at this point in the history
  • Loading branch information
kraney committed Mar 24, 2021
1 parent f9c98ec commit 9dd7034
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
20 changes: 18 additions & 2 deletions service/worker/scanner/tasklist/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (

var retryForeverPolicy = newRetryForeverPolicy()

func (s *Scavenger) completeTasks(info *p.TaskListInfo, limit int) (int, error) {
func (s *Scavenger) completeTasks(info *p.TaskListInfo, taskID int64, limit int) (int, error) {
var n int
var err error
err = s.retryForever(func() error {
n, err = s.db.CompleteTasksLessThan(s.ctx, &p.CompleteTasksLessThanRequest{
DomainID: info.DomainID,
TaskListName: info.Name,
TaskType: info.TaskType,
TaskID: info.AckLevel,
TaskID: taskID,
Limit: limit,
})
return err
Expand Down Expand Up @@ -70,6 +70,22 @@ func (s *Scavenger) completeTask(info *p.TaskListInfo, taskid int64) error {
return err
}

func (s *Scavenger) getTasks(info *p.TaskListInfo, batchSize int) (*p.GetTasksResponse, error) {
var err error
var resp *p.GetTasksResponse
err = s.retryForever(func() error {
resp, err = s.db.GetTasks(&p.GetTasksRequest{
DomainID: info.DomainID,
TaskList: info.Name,
TaskType: info.TaskType,
ReadLevel: -1, // get the first N tasks sorted by taskID
BatchSize: batchSize,
})
return err
})
return resp, err
}

func (s *Scavenger) listTaskList(pageSize int, pageToken []byte) (*p.ListTaskListResponse, error) {
var err error
var resp *p.ListTaskListResponse
Expand Down
41 changes: 40 additions & 1 deletion service/worker/scanner/tasklist/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (s *Scavenger) deleteHandler(info *p.TaskListInfo) handlerStatus {
defer func() { s.deleteHandlerLog(info, nProcessed, nDeleted, err) }()

for nProcessed < max {
nTasks, err := s.completeTasks(info, taskBatchSize)
// First we complete tasks up to the persisted acklevel regardless of the task expiration
nTasks, err := s.completeTasks(info, info.AckLevel, taskBatchSize)
if err == p.ErrPersistenceLimitExceeded {
s.logger.Info("scavenger.deleteHandler query was ratelimited; will retry")
return handlerStatusDefer
Expand All @@ -74,6 +75,44 @@ func (s *Scavenger) deleteHandler(info *p.TaskListInfo) handlerStatus {
nProcessed += nTasks
nDeleted += nTasks
if nTasks < taskBatchSize {
break
}
}
for nProcessed < max {
// if we finished completing tasks below the ack level, but still have budget, then focus on expired tasks
resp, err1 := s.getTasks(info, taskBatchSize)
if err1 != nil {
err = err1
s.logger.Error(fmt.Sprintf("Scavenger error getting tasks: %v", err))
return handlerStatusErr
}

nTasks := 0
unexpiredTaskFound := false
for _, task := range resp.Tasks {
nProcessed++
if !s.isTaskExpired(task) {
s.logger.Info(fmt.Sprintf("Scavenger stopping at an unexpired task. (Expires %v)", task.Expiry), tag.WorkflowTaskListName(info.Name), tag.TaskType(info.TaskType), tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TaskID(task.TaskID))
unexpiredTaskFound = true
break
}
nTasks++
}

if nTasks > 0 {
taskID := resp.Tasks[nTasks-1].TaskID
if _, err = s.completeTasks(info, taskID, nTasks); err != nil {
return handlerStatusErr
}
nDeleted += nTasks
}

if unexpiredTaskFound {
return handlerStatusDone
}

if nTasks < taskBatchSize {
// with no unexpired tasks left, it's safe to try and delete the task list itself
s.tryDeleteTaskList(info)
return handlerStatusDone
}
Expand Down

0 comments on commit 9dd7034

Please sign in to comment.