Skip to content

Commit

Permalink
Merge branch 'master' into fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Mar 26, 2018
2 parents 9e1742d + 7aff539 commit 4bb4bb5
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 5 deletions.
30 changes: 26 additions & 4 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"unicode"

"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/zap"
Expand Down Expand Up @@ -158,7 +159,7 @@ type (
workflowID string
childPolicy ChildWorkflowPolicy
waitForCancellation bool
signalChannels map[string]Channel
signalChannels map[string]SignalChannel
queryHandlers map[string]func([]byte) ([]byte, error)
workflowIDReusePolicy WorkflowIDReusePolicy
}
Expand Down Expand Up @@ -473,6 +474,27 @@ func getState(ctx Context) *coroutineState {
return state
}

func (c *channelImpl) ReceiveEncodedValue(ctx Context) (value encoded.Value, more bool) {
var blob []byte
more = c.Receive(ctx, &blob)
value = EncodedValue(blob)
return
}

func (c *channelImpl) ReceiveEncodedValueAsync() (value encoded.Value, ok bool) {
var blob []byte
ok = c.ReceiveAsync(&blob)
value = EncodedValue(blob)
return
}

func (c *channelImpl) ReceiveEncodedValueAsyncWithMoreFlag() (value encoded.Value, ok bool, more bool) {
var blob []byte
ok, more = c.ReceiveAsyncWithMoreFlag(&blob)
value = EncodedValue(blob)
return
}

func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state := getState(ctx)
hasResult := false
Expand Down Expand Up @@ -1030,18 +1052,18 @@ func setWorkflowEnvOptionsIfNotExist(ctx Context) Context {
if options != nil {
newOptions = *options
} else {
newOptions.signalChannels = make(map[string]Channel)
newOptions.signalChannels = make(map[string]SignalChannel)
newOptions.queryHandlers = make(map[string]func([]byte) ([]byte, error))
}
return WithValue(ctx, workflowEnvOptionsContextKey, &newOptions)
}

// getSignalChannel finds the assosciated channel for the signal.
func (w *workflowOptions) getSignalChannel(ctx Context, signalName string) Channel {
func (w *workflowOptions) getSignalChannel(ctx Context, signalName string) SignalChannel {
if ch, ok := w.signalChannels[signalName]; ok {
return ch
}
ch := NewBufferedChannel(ctx, defaultSignalChannelSize)
ch := NewBufferedChannel(ctx, defaultSignalChannelSize).(SignalChannel)
w.signalChannels[signalName] = ch
return ch
}
Expand Down
9 changes: 9 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type (
metricsScope *metrics.TaggedScope
mockClock *clock.Mock
wallClock clock.Clock
startTime time.Time

callbackChannel chan testCallbackHandle
testTimeout time.Duration
Expand Down Expand Up @@ -205,6 +206,14 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme
doneChannel: make(chan struct{}),
}

// move forward the mock clock to start time.
startTime := env.startTime
if startTime == time.Unix(0, 0) {
// if start time not set, use current clock time
startTime = env.wallClock.Now()
}
env.mockClock.Add(startTime.Sub(env.mockClock.Now()))

// put current workflow as a running workflow so child can send signal to parent
env.runningWorkflows[env.workflowInfo.WorkflowExecution.ID] = &testWorkflowHandle{env: env, callback: func(result []byte, err error) {}}

Expand Down
24 changes: 24 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,3 +1602,27 @@ func (s *WorkflowTestSuiteUnitTest) Test_Channel() {
_, ok := env.GetWorkflowError().(*ContinueAsNewError)
s.True(ok)
}

func (s *WorkflowTestSuiteUnitTest) Test_SignalChannel() {
workflowFn := func(ctx Context) error {
signalCh := GetSignalChannel(ctx, "test-signal")
encodedValue, _ := signalCh.ReceiveEncodedValue(ctx)

var signal string
err := encodedValue.Get(&signal)
return err
}

RegisterWorkflow(workflowFn)
env := s.NewTestWorkflowEnvironment()

env.RegisterDelayedCallback(func() {
env.SignalWorkflow("test-signal", 123)
}, time.Minute)

env.ExecuteWorkflow(workflowFn)

s.True(env.IsWorkflowCompleted())
s.Error(env.GetWorkflowError())
s.Contains(env.GetWorkflowError().Error(), "decode")
}
24 changes: 23 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ type (
Close()
}

// SignalChannel extends from Channel. It adds the ability to deal with corrupted signal data. Signal is sent to
// Cadence server as binary blob. When workflow try to receive signal data as strongly typed value, the Channel will
// try to decode that binary blob into that strongly typed value pointer. If that data is corrupted and cannot be
// decoded, the Receive call will panic which will block the workflow. That might not be expected behavior. This
// SignalChannel adds new methods so that workflow could receive signal as encoded.Value, and then extract that strongly
// typed value from encoded.Value. If the decoding fails, the encoded.Value will return error instead of panic.
SignalChannel interface {
Channel

// ReceiveEncodedValue blocks until it receives a value, and then return that value as encoded.Value.
// Returns false when Channel is closed.
ReceiveEncodedValue(ctx Context) (value encoded.Value, more bool)

// ReceiveEncodedValueAsync try to receive from Channel without blocking. If there is data available from the
// Channel, it returns the data as encoded.Value and true. Otherwise, it returns nil and false immediately.
ReceiveEncodedValueAsync() (value encoded.Value, ok bool)

// ReceiveEncodedValueAsyncWithMoreFlag is same as ReceiveEncodedValueAsync with extra return value more to
// indicate if there could be more value from the Channel. The more is false when Channel is closed.
ReceiveEncodedValueAsyncWithMoreFlag() (value encoded.Value, ok bool, more bool)
}

// Selector must be used instead of native go select by workflow code.
// Use workflow.NewSelector(ctx) method to create a Selector instance.
Selector interface {
Expand Down Expand Up @@ -712,7 +734,7 @@ func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context {
}

// GetSignalChannel returns channel corresponding to the signal name.
func GetSignalChannel(ctx Context, signalName string) Channel {
func GetSignalChannel(ctx Context, signalName string) SignalChannel {
return getWorkflowEnvOptions(ctx).getSignalChannel(ctx, signalName)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func (t *TestActivityEnvironment) SetWorkerOptions(options WorkerOptions) *TestA
return t
}

// SetStartTime sets the start time of the workflow. This is optional, default start time will be the wall clock time when
// workflow starts. Start time is the workflow.Now(ctx) time at the beginning of the workflow.
func (t *TestWorkflowEnvironment) SetStartTime(startTime time.Time) {
t.impl.startTime = startTime
}

// OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string).
// You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to
// the Return() call should either be a function that has exact same signature as the mocked activity, or it should be
Expand Down
8 changes: 8 additions & 0 deletions workflow/deterministic_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type (
// Use workflow.NewChannel(ctx) method to create Channel instance.
Channel = internal.Channel

// SignalChannel extends from Channel. It adds the ability to deal with corrupted signal data. Signal is sent to
// Cadence server as binary blob. When workflow try to receive signal data as strongly typed value, the Channel will
// try to decode that binary blob into that strongly typed value pointer. If that data is corrupted and cannot be
// decoded, the Receive call will panic which will block the workflow. That might not be expected behavior. This
// SignalChannel adds new methods so that workflow could receive signal as encoded.Value, and then extract that strongly
// typed value from encoded.Value. If the decoding fails, the encoded.Value will return error instead of panic.
SignalChannel = internal.SignalChannel

// Selector must be used instead of native go select by workflow code.
// Use workflow.NewSelector(ctx) method to create a Selector instance.
Selector = internal.Selector
Expand Down

0 comments on commit 4bb4bb5

Please sign in to comment.