Skip to content

Commit

Permalink
Utilize shard clock for workflow / activity task (#2744)
Browse files Browse the repository at this point in the history
* Add shard clock util
* Add shard clock to shard context
* Use shard clock for activity / workflow task
  • Loading branch information
wxing1292 committed Apr 21, 2022
1 parent 8426ac5 commit 58456a4
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 18 deletions.
5 changes: 5 additions & 0 deletions common/persistence/tests/task_queue_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"

clockpb "go.temporal.io/server/api/clock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
p "go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -344,6 +345,10 @@ func (s *TaskQueueTaskSuite) randomTask(
ScheduleId: rand.Int63(),
CreateTime: now,
ExpiryTime: timestamp.TimePtr(now.Add(s.taskTTL)),
Clock: &clockpb.ShardClock{
Id: rand.Int31(),
Clock: rand.Int63(),
},
},
}
}
4 changes: 4 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
commonpb "go.temporal.io/api/common/v1"

"go.temporal.io/server/api/adminservice/v1"
clockpb "go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
Expand Down Expand Up @@ -66,6 +67,9 @@ type (
GetEngine() (Engine, error)
GetEngineWithContext(ctx context.Context) (Engine, error)

NewVectorClock() (*clockpb.ShardClock, error)
CurrentVectorClock() *clockpb.ShardClock

GenerateTaskID() (int64, error)
GenerateTaskIDs(number int) ([]int64, error)

Expand Down
21 changes: 21 additions & 0 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import (
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

clockpb "go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/vclock"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -193,6 +195,25 @@ func (s *ContextImpl) GetMaxTaskIDForCurrentRangeID() int64 {
return s.maxTaskSequenceNumber - 1
}

func (s *ContextImpl) NewVectorClock() (*clockpb.ShardClock, error) {
s.wLock()
defer s.wUnlock()

clock, err := s.generateTaskIDLocked()
if err != nil {
return nil, err
}
return vclock.NewShardClock(s.shardID, clock), nil
}

func (s *ContextImpl) CurrentVectorClock() *clockpb.ShardClock {
s.rLock()
defer s.rUnlock()

clock := s.taskSequenceNumber
return vclock.NewShardClock(s.shardID, clock)
}

func (s *ContextImpl) GenerateTaskID() (int64, error) {
s.wLock()
defer s.wUnlock()
Expand Down
42 changes: 36 additions & 6 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions service/history/timerQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/vclock"
"go.temporal.io/server/service/history/workflow"
)

Expand Down Expand Up @@ -471,6 +472,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
TaskQueue: taskQueue,
ScheduleId: task.EventID,
ScheduleToStartTimeout: timestamp.DurationPtr(scheduleToStartTimeout),
Clock: vclock.NewShardClock(t.shard.GetShardID(), task.TaskID),
})

return retError
Expand Down
2 changes: 2 additions & 0 deletions service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/vclock"
"go.temporal.io/server/service/history/workflow"
)

Expand Down Expand Up @@ -1147,6 +1148,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Fire() {
},
ScheduleId: activityInfo.ScheduleId,
ScheduleToStartTimeout: activityInfo.ScheduleToStartTimeout,
Clock: vclock.NewShardClock(s.mockShard.GetShardID(), timerTask.TaskID),
},
gomock.Any(),
).Return(&matchingservice.AddActivityTaskResponse{}, nil)
Expand Down
3 changes: 3 additions & 0 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
Expand All @@ -46,6 +47,7 @@ import (
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/vclock"
"go.temporal.io/server/service/history/workflow"
)

Expand Down Expand Up @@ -596,6 +598,7 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity(
},
ScheduleId: activityTask.EventID,
ScheduleToStartTimeout: activityScheduleToStartTimeout,
Clock: vclock.NewShardClock(t.shard.GetShardID(), activityTask.TaskID),
})
return err
}
Expand Down
29 changes: 17 additions & 12 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/adminservicemock/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
Expand All @@ -61,6 +62,7 @@ import (
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/vclock"
"go.temporal.io/server/service/history/workflow"
)

Expand Down Expand Up @@ -1455,6 +1457,19 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_PushT
// Flush buffered events so real IDs get assigned
mutableState.FlushBufferedEvents()

timerTask := &tasks.ActivityRetryTimerTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Attempt: 2,
Version: s.version,
TaskID: int64(100),
VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp,
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration))
Expand All @@ -1470,21 +1485,11 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_PushT
},
ScheduleId: scheduledEvent.EventId,
ScheduleToStartTimeout: &timerTimeout,
Clock: vclock.NewShardClock(s.mockShard.GetShardID(), timerTask.TaskID),
},
gomock.Any(),
).Return(&matchingservice.AddActivityTaskResponse{}, nil)
timerTask := &tasks.ActivityRetryTimerTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Attempt: 2,
Version: s.version,
TaskID: int64(100),
VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp,
EventID: scheduledEvent.GetEventId(),
}

err = s.timerQueueStandbyTaskExecutor.execute(context.Background(), timerTask, true)
s.Nil(err)
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/vclock"

"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/consts"
Expand Down Expand Up @@ -2011,6 +2012,7 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddActivityTaskRequest(
},
ScheduleId: task.ScheduleID,
ScheduleToStartTimeout: ai.ScheduleToStartTimeout,
Clock: vclock.NewShardClock(s.mockShard.GetShardID(), task.TaskID),
}
}

Expand Down Expand Up @@ -2039,6 +2041,7 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddWorkflowTaskRequest(
TaskQueue: taskQueue,
ScheduleId: task.ScheduleID,
ScheduleToStartTimeout: &timeout,
Clock: vclock.NewShardClock(s.mockShard.GetShardID(), task.TaskID),
}
}

Expand Down
3 changes: 3 additions & 0 deletions service/history/transferQueueTaskExecutorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/vclock"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/worker/archiver"
)
Expand Down Expand Up @@ -122,6 +123,7 @@ func (t *transferQueueTaskExecutorBase) pushActivity(
},
ScheduleId: task.ScheduleID,
ScheduleToStartTimeout: activityScheduleToStartTimeout,
Clock: vclock.NewShardClock(t.shard.GetShardID(), task.TaskID),
})
if _, ok := err.(*serviceerror.NotFound); ok {
// NotFound error is not expected for AddTasks calls
Expand Down Expand Up @@ -150,6 +152,7 @@ func (t *transferQueueTaskExecutorBase) pushWorkflowTask(
TaskQueue: taskqueue,
ScheduleId: task.ScheduleID,
ScheduleToStartTimeout: workflowTaskScheduleToStartTimeout,
Clock: vclock.NewShardClock(t.shard.GetShardID(), task.TaskID),
})
if _, ok := err.(*serviceerror.NotFound); ok {
// NotFound error is not expected for AddTasks calls
Expand Down

0 comments on commit 58456a4

Please sign in to comment.