Skip to content

Commit

Permalink
Merge branch 'main' into ppv/dlq-queueV2
Browse files Browse the repository at this point in the history
  • Loading branch information
prathyushpv committed Oct 19, 2023
2 parents 696a6ee + 068bd38 commit ac77565
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 96 deletions.
62 changes: 39 additions & 23 deletions common/metrics/metric_defs.go
Expand Up @@ -768,29 +768,45 @@ var (
"pending_tasks",
WithDescription("A histogram across history shards for the number of in-memory pending history tasks."),
)
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
QueueActionCounter = NewCounterDef("queue_actions")
QueueActionFailures = NewCounterDef("queue_action_errors")
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
CommandTypeScheduleActivityCounter = NewCounterDef("schedule_activity_command")
CommandTypeCompleteWorkflowCounter = NewCounterDef("complete_workflow_command")
CommandTypeFailWorkflowCounter = NewCounterDef("fail_workflow_command")
CommandTypeCancelWorkflowCounter = NewCounterDef("cancel_workflow_command")
CommandTypeStartTimerCounter = NewCounterDef("start_timer_command")
CommandTypeCancelActivityCounter = NewCounterDef("cancel_activity_command")
CommandTypeCancelTimerCounter = NewCounterDef("cancel_timer_command")
CommandTypeRecordMarkerCounter = NewCounterDef("record_marker_command")
CommandTypeCancelExternalWorkflowCounter = NewCounterDef("cancel_external_workflow_command")
CommandTypeContinueAsNewCounter = NewCounterDef("continue_as_new_command")
CommandTypeSignalExternalWorkflowCounter = NewCounterDef("signal_external_workflow_command")
CommandTypeUpsertWorkflowSearchAttributesCounter = NewCounterDef("upsert_workflow_search_attributes_command")
CommandTypeModifyWorkflowPropertiesCounter = NewCounterDef("modify_workflow_properties_command")
CommandTypeChildWorkflowCounter = NewCounterDef("child_workflow_command")
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
QueueActionCounter = NewCounterDef("queue_actions")
QueueActionFailures = NewCounterDef("queue_action_errors")
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
CommandCounter = NewCounterDef("command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeScheduleActivityCounter = NewCounterDef("schedule_activity_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeCompleteWorkflowCounter = NewCounterDef("complete_workflow_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeFailWorkflowCounter = NewCounterDef("fail_workflow_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeCancelWorkflowCounter = NewCounterDef("cancel_workflow_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeStartTimerCounter = NewCounterDef("start_timer_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeCancelActivityCounter = NewCounterDef("cancel_activity_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeCancelTimerCounter = NewCounterDef("cancel_timer_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeRecordMarkerCounter = NewCounterDef("record_marker_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeCancelExternalWorkflowCounter = NewCounterDef("cancel_external_workflow_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeContinueAsNewCounter = NewCounterDef("continue_as_new_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeSignalExternalWorkflowCounter = NewCounterDef("signal_external_workflow_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeUpsertWorkflowSearchAttributesCounter = NewCounterDef("upsert_workflow_search_attributes_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeModifyWorkflowPropertiesCounter = NewCounterDef("modify_workflow_properties_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeChildWorkflowCounter = NewCounterDef("child_workflow_command")
// Deprecated: replaced by CommandCounter and will be removed in a future release
CommandTypeProtocolMessage = NewCounterDef("protocol_message_command")
MessageTypeRequestWorkflowExecutionUpdateCounter = NewCounterDef("request_workflow_update_message")
MessageTypeAcceptWorkflowExecutionUpdateCounter = NewCounterDef("accept_workflow_update_message")
Expand Down
79 changes: 54 additions & 25 deletions service/history/workflow/task_refresher.go
Expand Up @@ -157,6 +157,11 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowStart(
taskGenerator TaskGenerator,
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -187,19 +192,18 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowClose(
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
closeEventTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}

return taskGenerator.GenerateWorkflowCloseTasks(
closeEventTime,
false,
)
if executionState.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}
closeEventTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}

return nil
return taskGenerator.GenerateWorkflowCloseTasks(
closeEventTime,
false,
)
}

func (r *TaskRefresherImpl) refreshTasksForRecordWorkflowStarted(
Expand All @@ -209,26 +213,30 @@ func (r *TaskRefresherImpl) refreshTasksForRecordWorkflowStarted(
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

if executionState.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
}

return taskGenerator.GenerateRecordWorkflowStartedTasks(
startEvent,
)
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
}

return nil
return taskGenerator.GenerateRecordWorkflowStartedTasks(
startEvent,
)
}

func (r *TaskRefresherImpl) refreshWorkflowTaskTasks(
mutableState MutableState,
taskGenerator TaskGenerator,
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

if !mutableState.HasPendingWorkflowTask() {
// no workflow task at all
return nil
Expand Down Expand Up @@ -264,6 +272,11 @@ func (r *TaskRefresherImpl) refreshTasksForActivity(
taskGenerator TaskGenerator,
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

pendingActivityInfos := mutableState.GetPendingActivityInfos()

Loop:
Expand Down Expand Up @@ -307,8 +320,12 @@ func (r *TaskRefresherImpl) refreshTasksForTimer(
mutableState MutableState,
) error {

pendingTimerInfos := mutableState.GetPendingTimerInfos()
executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

pendingTimerInfos := mutableState.GetPendingTimerInfos()
for _, timerInfo := range pendingTimerInfos {
// clear all timer task mask for later timer task re-generation
timerInfo.TaskStatus = TimerTaskStatusNone
Expand Down Expand Up @@ -364,6 +381,11 @@ func (r *TaskRefresherImpl) refreshTasksForRequestCancelExternalWorkflow(
taskGenerator TaskGenerator,
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

pendingRequestCancelInfos := mutableState.GetPendingRequestCancelExternalInfos()

for _, requestCancelInfo := range pendingRequestCancelInfos {
Expand All @@ -388,6 +410,11 @@ func (r *TaskRefresherImpl) refreshTasksForSignalExternalWorkflow(
taskGenerator TaskGenerator,
) error {

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}

pendingSignalInfos := mutableState.GetPendingSignalExternalInfos()

for _, signalInfo := range pendingSignalInfos {
Expand All @@ -411,8 +438,10 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowSearchAttr(
mutableState MutableState,
taskGenerator TaskGenerator,
) error {
if mutableState.GetExecutionState().Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return taskGenerator.GenerateUpsertVisibilityTask()
executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return nil
}
return nil

return taskGenerator.GenerateUpsertVisibilityTask()
}
16 changes: 11 additions & 5 deletions service/history/workflow_task_handler.go
Expand Up @@ -129,7 +129,6 @@ func newWorkflowTaskHandler(
searchAttributesMapperProvider searchattribute.MapperProvider,
hasBufferedEvents bool,
) *workflowTaskHandlerImpl {

return &workflowTaskHandlerImpl{
identity: identity,
workflowTaskCompletedID: workflowTaskCompletedID,
Expand All @@ -152,10 +151,13 @@ func newWorkflowTaskHandler(

logger: logger,
namespaceRegistry: namespaceRegistry,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)),
config: config,
shard: shard,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
metricsHandler: metricsHandler.WithTags(
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope),
metrics.NamespaceTag(mutableState.GetNamespaceEntry().Name().String()),
),
config: config,
shard: shard,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
}
}

Expand Down Expand Up @@ -215,6 +217,10 @@ func (handler *workflowTaskHandlerImpl) handleCommand(
command *commandpb.Command,
msgs *collection.IndexedTakeList[string, *protocolpb.Message],
) (*handleCommandResponse, error) {

handler.metricsHandler.Counter(metrics.CommandCounter.GetMetricName()).
Record(1, metrics.CommandTypeTag(command.GetCommandType().String()))

switch command.GetCommandType() {
case enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
return handler.handleCommandScheduleActivity(ctx, command.GetScheduleActivityTaskCommandAttributes())
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow_task_handler_test.go
Expand Up @@ -38,6 +38,7 @@ import (
protocolpb "go.temporal.io/api/protocol/v1"
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/server/service/history/tests"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -83,6 +84,7 @@ func TestCommandProtocolMessage(t *testing.T) {
out.conf = map[dynamicconfig.Key]any{}
out.ms = workflow.NewMockMutableState(gomock.NewController(t))
out.ms.EXPECT().VisitUpdates(gomock.Any()).Times(1)
out.ms.EXPECT().GetNamespaceEntry().Return(tests.LocalNamespaceEntry).Times(1)
out.updates = update.NewRegistry(func() update.Store { return out.ms })
var effects effect.Buffer
config := configs.NewConfig(
Expand Down
24 changes: 18 additions & 6 deletions service/worker/migration/activities.go
Expand Up @@ -64,6 +64,7 @@ type (
historyClient historyservice.HistoryServiceClient
frontendClient workflowservice.WorkflowServiceClient
clientFactory serverClient.Factory
clientBean serverClient.Bean
logger log.Logger
metricsHandler metrics.Handler
forceReplicationMetricsHandler metrics.Handler
Expand Down Expand Up @@ -746,13 +747,24 @@ const (

func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verifyReplicationTasksRequest) (verifyReplicationTasksResponse, error) {
ctx = headers.SetCallerInfo(ctx, headers.NewPreemptableCallerInfo(request.Namespace))
remoteClient := a.clientFactory.NewRemoteAdminClientWithTimeout(
request.TargetClusterEndpoint,
admin.DefaultTimeout,
admin.DefaultLargeTimeout,
)

var response verifyReplicationTasksResponse
var remoteClient adminservice.AdminServiceClient
var err error

if len(request.TargetClusterName) > 0 {
remoteClient, err = a.clientBean.GetRemoteAdminClient(request.TargetClusterName)
if err != nil {
return response, err
}
} else {
// TODO: remove once TargetClusterEndpoint is no longer used.
remoteClient = a.clientFactory.NewRemoteAdminClientWithTimeout(
request.TargetClusterEndpoint,
admin.DefaultTimeout,
admin.DefaultLargeTimeout,
)
}

var details replicationTasksHeartbeatDetails
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &details); err != nil {
Expand Down

0 comments on commit ac77565

Please sign in to comment.