Skip to content

Commit

Permalink
Split integration_tests.go (#1548)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan committed May 19, 2021
1 parent 71f073b commit b3f10a7
Show file tree
Hide file tree
Showing 20 changed files with 4,744 additions and 4,193 deletions.
File renamed without changes.
443 changes: 443 additions & 0 deletions host/child_workflow_test.go

Large diffs are not rendered by default.

File renamed without changes.
707 changes: 707 additions & 0 deletions host/cron_test.go

Large diffs are not rendered by default.

285 changes: 285 additions & 0 deletions host/describe_test.go
@@ -0,0 +1,285 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package host

import (
"bytes"
"encoding/binary"
"time"

"github.com/pborman/uuid"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/primitives/timestamp"
)

func (s *integrationSuite) TestDescribeWorkflowExecution() {
id := "integration-describe-wfe-test"
wt := "integration-describe-wfe-test-type"
tl := "integration-describe-wfe-test-taskqueue"
identity := "worker1"

// Start workflow execution
request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
}

we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

describeWorkflowExecution := func() (*workflowservice.DescribeWorkflowExecutionResponse, error) {
return s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
})
}
dweResponse, err := describeWorkflowExecution()
s.NoError(err)
s.Nil(dweResponse.WorkflowExecutionInfo.CloseTime)
s.Equal(int64(2), dweResponse.WorkflowExecutionInfo.HistoryLength) // WorkflowStarted, WorkflowTaskScheduled
s.Equal(dweResponse.WorkflowExecutionInfo.GetStartTime(), dweResponse.WorkflowExecutionInfo.GetExecutionTime())

// workflow logic
workflowComplete := false
signalSent := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
if !signalSent {
signalSent = true

s.NoError(err)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "1",
ActivityType: &commonpb.ActivityType{Name: "test-activity-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: payloads.EncodeString("test-input"),
ScheduleToCloseTimeout: timestamp.DurationPtr(100 * time.Second),
ScheduleToStartTimeout: timestamp.DurationPtr(2 * time.Second),
StartToCloseTimeout: timestamp.DurationPtr(50 * time.Second),
HeartbeatTimeout: timestamp.DurationPtr(5 * time.Second),
}},
}}, nil
}

workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil
}

atHandler := func(execution *commonpb.WorkflowExecution, activityType *commonpb.ActivityType,
activityID string, input *commonpb.Payloads, taskToken []byte) (*commonpb.Payloads, bool, error) {
return payloads.EncodeString("Activity Result"), false, nil
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

// first workflow task to schedule new activity
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

dweResponse, err = describeWorkflowExecution()
s.NoError(err)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, dweResponse.WorkflowExecutionInfo.GetStatus())
s.Equal(int64(5), dweResponse.WorkflowExecutionInfo.HistoryLength) // WorkflowTaskStarted, WorkflowTaskCompleted, ActivityScheduled
s.Equal(1, len(dweResponse.PendingActivities))
s.Equal("test-activity-type", dweResponse.PendingActivities[0].ActivityType.GetName())
s.True(timestamp.TimeValue(dweResponse.PendingActivities[0].GetLastHeartbeatTime()).IsZero())

// process activity task
err = poller.PollAndProcessActivityTask(false)

dweResponse, err = describeWorkflowExecution()
s.NoError(err)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, dweResponse.WorkflowExecutionInfo.GetStatus())
s.Equal(int64(8), dweResponse.WorkflowExecutionInfo.HistoryLength) // ActivityTaskStarted, ActivityTaskCompleted, WorkflowTaskScheduled
s.Equal(0, len(dweResponse.PendingActivities))

// Process signal in workflow
_, err = poller.PollAndProcessWorkflowTask(true, false)
s.NoError(err)
s.True(workflowComplete)

dweResponse, err = describeWorkflowExecution()
s.NoError(err)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, dweResponse.WorkflowExecutionInfo.GetStatus())
s.Equal(int64(11), dweResponse.WorkflowExecutionInfo.HistoryLength) // WorkflowTaskStarted, WorkflowTaskCompleted, WorkflowCompleted
}

func (s *integrationSuite) TestDescribeTaskQueue() {
workflowID := "integration-get-poller-history"
wt := "integration-get-poller-history-type"
tl := "integration-get-poller-history-taskqueue"
identity := "worker1"
activityName := "activity_type1"

// Start workflow execution
request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: workflowID,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
}

we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

// workflow logic
activityScheduled := false
activityData := int32(1)
// var signalEvent *historypb.HistoryEvent
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

if !activityScheduled {
activityScheduled = true
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityData))

return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: convert.Int32ToString(1),
ActivityType: &commonpb.ActivityType{Name: activityName},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: payloads.EncodeBytes(buf.Bytes()),
ScheduleToCloseTimeout: timestamp.DurationPtr(100 * time.Second),
ScheduleToStartTimeout: timestamp.DurationPtr(25 * time.Second),
StartToCloseTimeout: timestamp.DurationPtr(50 * time.Second),
HeartbeatTimeout: timestamp.DurationPtr(25 * time.Second),
}},
}}, nil
}

return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil
}

atHandler := func(execution *commonpb.WorkflowExecution, activityType *commonpb.ActivityType,
activityID string, input *commonpb.Payloads, taskToken []byte) (*commonpb.Payloads, bool, error) {
return payloads.EncodeString("Activity Result"), false, nil
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

// this function poll events from history side
testDescribeTaskQueue := func(namespace string, taskqueue *taskqueuepb.TaskQueue, taskqueueType enumspb.TaskQueueType) []*taskqueuepb.PollerInfo {
responseInner, errInner := s.engine.DescribeTaskQueue(NewContext(), &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: taskqueue,
TaskQueueType: taskqueueType,
})

s.NoError(errInner)
return responseInner.Pollers
}

before := time.Now().UTC()

// when no one polling on the taskqueue (activity or workflow), there shall be no poller information
pollerInfos := testDescribeTaskQueue(s.namespace, &taskqueuepb.TaskQueue{Name: tl}, enumspb.TASK_QUEUE_TYPE_ACTIVITY)
s.Empty(pollerInfos)
pollerInfos = testDescribeTaskQueue(s.namespace, &taskqueuepb.TaskQueue{Name: tl}, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
s.Empty(pollerInfos)

_, errWorkflowTask := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(errWorkflowTask)
pollerInfos = testDescribeTaskQueue(s.namespace, &taskqueuepb.TaskQueue{Name: tl}, enumspb.TASK_QUEUE_TYPE_ACTIVITY)
s.Empty(pollerInfos)
pollerInfos = testDescribeTaskQueue(s.namespace, &taskqueuepb.TaskQueue{Name: tl}, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
s.Equal(1, len(pollerInfos))
s.Equal(identity, pollerInfos[0].GetIdentity())
s.True(pollerInfos[0].GetLastAccessTime().After(before))
s.NotEmpty(pollerInfos[0].GetLastAccessTime())

errActivity := poller.PollAndProcessActivityTask(false)
s.NoError(errActivity)
pollerInfos = testDescribeTaskQueue(s.namespace, &taskqueuepb.TaskQueue{Name: tl}, enumspb.TASK_QUEUE_TYPE_ACTIVITY)
s.Equal(1, len(pollerInfos))
s.Equal(identity, pollerInfos[0].GetIdentity())
s.True(pollerInfos[0].GetLastAccessTime().After(before))
s.NotEmpty(pollerInfos[0].GetLastAccessTime())
pollerInfos = testDescribeTaskQueue(s.namespace, &taskqueuepb.TaskQueue{Name: tl}, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
s.Equal(1, len(pollerInfos))
s.Equal(identity, pollerInfos[0].GetIdentity())
s.True(pollerInfos[0].GetLastAccessTime().After(before))
s.NotEmpty(pollerInfos[0].GetLastAccessTime())
}

0 comments on commit b3f10a7

Please sign in to comment.