Skip to content

Commit

Permalink
Fix error-handling in query_workflow_test
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 20, 2022
1 parent c6a89c0 commit 5ba498a
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions host/query_workflow_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/service/history/consts"

"go.temporal.io/server/common/log/tag"
Expand All @@ -46,9 +47,11 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_Sticky() {
// every replay will start from here
atomic.AddInt32(&replayCount, 1)

workflow.SetQueryHandler(ctx, "test", func() (string, error) {
if err := workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return "query works", nil
})
}); err != nil {
return "", err
}

signalCh := workflow.GetSignalChannel(ctx, "test")
var msg string
Expand Down Expand Up @@ -87,26 +90,20 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_Sticky() {
}

func (s *clientIntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
workflowFn := func(ctx workflow.Context) (string, error) {
var receivedMsgs string
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return receivedMsgs, nil
})
workflowFn := func(ctx workflow.Context) error {
var msgs []string
if err := workflow.SetQueryHandler(ctx, "test", func() ([]string, error) {
return msgs, nil
}); err != nil {
return err
}

signalCh := workflow.GetSignalChannel(ctx, "test")
for {
var msg string
signalCh.Receive(ctx, &msg)
receivedMsgs += msg
if msg == "pause" {
// block workflow task for 3s.
workflow.ExecuteLocalActivity(ctx, func() {
time.Sleep(time.Second * 3)
}).Get(ctx, nil)
}
var msg string
for signalCh.Receive(ctx, &msg) {
msgs = append(msgs, msg)
}

return receivedMsgs, nil
return nil
}

s.worker.RegisterWorkflow(workflowFn)
Expand All @@ -127,29 +124,32 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

err = s.sdkClient.SignalWorkflow(ctx, id, "", "test", "pause")
err = s.sdkClient.SignalWorkflow(ctx, id, "", "test", "abc")
s.NoError(err)

err = s.sdkClient.SignalWorkflow(ctx, id, "", "test", "abc")
err = s.sdkClient.SignalWorkflow(ctx, id, "", "test", "def")
s.NoError(err)

queryResult, err := s.sdkClient.QueryWorkflow(ctx, id, "", "test")
s.NoError(err)

var queryResultStr string
err = queryResult.Get(&queryResultStr)
var queryResults []string
err = queryResult.Get(&queryResults)
s.NoError(err)

// verify query sees all signals before it
s.Equal("pauseabc", queryResultStr)
s.ElementsMatch([]string{"abc", "def"}, queryResults)
}

func (s *clientIntegrationSuite) TestQueryWorkflow_QueryWhileBackoff() {
workflowFn := func(ctx workflow.Context) (string, error) {
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return "should-reach-here", nil
})
return "", temporal.NewApplicationError("retry-me", "test-error")
workflowFn := func(ctx workflow.Context) error {
if err := workflow.SetQueryHandler(ctx, "test", func() (string, error) {
s.Fail("query should not be executed")
return "", nil
}); err != nil {
return err
}
return temporal.NewApplicationError("retry-me", "test-error")
}

s.worker.RegisterWorkflow(workflowFn)
Expand Down Expand Up @@ -203,12 +203,16 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_QueryBeforeStart() {

workflowFn := func(ctx workflow.Context) (string, error) {
status := "initialized"
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
if err := workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return status, nil
})
}); err != nil {
return "", err
}

status = "started"
workflow.Sleep(ctx, time.Hour)
if err := workflow.Sleep(ctx, time.Hour); err != nil {
return "", err
}
return "", nil
}

Expand Down

0 comments on commit 5ba498a

Please sign in to comment.