From d2a2abd179bd6e2d23e84c2def8268aecbf820b7 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Wed, 19 Aug 2020 00:23:27 -0700 Subject: [PATCH] Add integration test for SignalWorkflow. --- internal/internal_workflow.go | 4 ++-- test/integration_test.go | 21 +++++++++++++++++++++ test/workflow_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index b20aa990e..510cdade7 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -797,7 +797,7 @@ func (c *channelImpl) assignValue(from interface{}, to interface{}) error { err := decodeAndAssignValue(c.dataConverter, from, to) // add to metrics if err != nil { - c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), tagError, err) + c.env.GetLogger().Error(fmt.Sprintf("Deserialization error. Corrupted signal received on channel %s.", c.name), tagError, err) c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1) } return err @@ -1239,7 +1239,7 @@ func (w *WorkflowOptions) getSignalChannel(ctx Context, signalName string) Recei if ch, ok := w.signalChannels[signalName]; ok { return ch } - ch := NewBufferedChannel(ctx, defaultSignalChannelSize) + ch := NewNamedBufferedChannel(ctx, signalName, defaultSignalChannelSize) w.signalChannels[signalName] = ch return ch } diff --git a/test/integration_test.go b/test/integration_test.go index 70441b5ee..b03e5a958 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -36,6 +36,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" @@ -303,6 +304,26 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() { ts.Equal("signal-input", queryResult) } +func (ts *IntegrationTestSuite) TestSignalWorkflow() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + wfOpts := ts.startWorkflowOptions("test-signal-workflow") + run, err := ts.client.ExecuteWorkflow(ctx, wfOpts, ts.workflows.SignalWorkflow) + ts.Nil(err) + err = ts.client.SignalWorkflow(ctx, "test-signal-workflow", run.GetRunID(), "string-signal", "string-value") + ts.NoError(err) + + wt := &commonpb.WorkflowType{Name: "workflow-type"} + err = ts.client.SignalWorkflow(ctx, "test-signal-workflow", run.GetRunID(), "proto-signal", wt) + ts.NoError(err) + + var protoValue *commonpb.WorkflowType + err = run.Get(ctx, &protoValue) + ts.NoError(err) + ts.Equal(commonpb.WorkflowType{Name: "string-value"}, *protoValue) +} + func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() { var result string err := ts.executeWorkflow( diff --git a/test/workflow_test.go b/test/workflow_test.go index 02be67671..6f588238b 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -31,6 +31,7 @@ import ( "strconv" "time" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/converter" @@ -543,6 +544,30 @@ func (w *Workflows) ConsistentQueryWorkflow(ctx workflow.Context, delay time.Dur return nil } +func (w *Workflows) SignalWorkflow(ctx workflow.Context) (*commonpb.WorkflowType, error) { + s := workflow.NewSelector(ctx) + + stringSignalChan := workflow.GetSignalChannel(ctx, "string-signal") + var stringSignalValue string + s.AddReceive(stringSignalChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &stringSignalValue) + workflow.GetLogger(ctx).Info("Received signal", "signal", "string-signal", "value", stringSignalValue) + }) + s.Select(ctx) + + protoSignalChan := workflow.GetSignalChannel(ctx, "proto-signal") + var protoSignalValue *commonpb.WorkflowType + s.AddReceive(protoSignalChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &protoSignalValue) + workflow.GetLogger(ctx).Info("Received signal", "signal", "proto-signal", "value", protoSignalValue) + }) + s.Select(ctx) + + protoSignalValue.Name = stringSignalValue + + return protoSignalValue, nil +} + func (w *Workflows) RetryTimeoutStableErrorWorkflow(ctx workflow.Context) ([]string, error) { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: 1 * time.Second, @@ -865,6 +890,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.WorkflowWithParallelLocalActivities) worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects) worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects) + worker.RegisterWorkflow(w.SignalWorkflow) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childForMemoAndSearchAttr)