Skip to content

Commit

Permalink
Merge 323ad5f into 791d5a2
Browse files Browse the repository at this point in the history
  • Loading branch information
sivakku committed Jul 5, 2017
2 parents 791d5a2 + 323ad5f commit 0d787b4
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 24 deletions.
12 changes: 9 additions & 3 deletions activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ func GetActivityLogger(ctx context.Context) *zap.Logger {
// details - the details that you provided here can be seen in the worflow when it receives TimeoutError, you
// can check error TimeOutType()/Details().
func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
data, err := getHostEnvironment().encodeArgs(details)
if err != nil {
panic(err)
var data []byte
var err error
// We would like to be a able to pass in "nil" as part of details(that is no progress to report to)
if len(details) != 1 || details[0] != nil {
data, err = getHostEnvironment().encodeArgs(details)
if err != nil {
panic(err)
}
}
env := getActivityEnv(ctx)
err = env.serviceInvoker.Heartbeat(data)
Expand All @@ -106,6 +111,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
type ServiceInvoker interface {
// Returns ActivityTaskCanceledError if activity is cancelled
Heartbeat(details []byte) error
Close()
}

// WithActivityTask adds activity specific information into context.
Expand Down
93 changes: 89 additions & 4 deletions activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func TestActivityHeartbeat(t *testing.T) {
service := new(mocks.TChanWorkflowService)
ctx, cancel := context.WithCancel(context.Background())
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel)
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel, 1)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{serviceInvoker: invoker})

service.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Expand All @@ -52,7 +52,7 @@ func TestActivityHeartbeat_InternalError(t *testing.T) {

service := new(mocks.TChanWorkflowService)
ctx, cancel := context.WithCancel(context.Background())
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel)
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel, 1)
invoker.(*cadenceInvoker).retryPolicy = p
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker,
Expand All @@ -67,7 +67,7 @@ func TestActivityHeartbeat_InternalError(t *testing.T) {
func TestActivityHeartbeat_CancelRequested(t *testing.T) {
service := new(mocks.TChanWorkflowService)
ctx, cancel := context.WithCancel(context.Background())
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel)
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel, 1)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker,
logger: getLogger()})
Expand All @@ -83,7 +83,7 @@ func TestActivityHeartbeat_CancelRequested(t *testing.T) {
func TestActivityHeartbeat_EntityNotExist(t *testing.T) {
service := new(mocks.TChanWorkflowService)
ctx, cancel := context.WithCancel(context.Background())
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel)
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel, 1)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker,
logger: getLogger()})
Expand All @@ -95,3 +95,88 @@ func TestActivityHeartbeat_EntityNotExist(t *testing.T) {
<-ctx.Done()
require.Equal(t, ctx.Err(), context.Canceled)
}

func TestActivityHeartbeat_SuppressContinousInvokes(t *testing.T) {
service := new(mocks.TChanWorkflowService)
ctx, cancel := context.WithCancel(context.Background())
invoker := newServiceInvoker([]byte("task-token"), "identity", service, cancel, 2)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker,
logger: getLogger()})

// Multiple calls but only one call is made.
service.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Return(&s.RecordActivityTaskHeartbeatResponse{}, nil).Once()
RecordActivityHeartbeat(ctx, "testDetails")
RecordActivityHeartbeat(ctx, "testDetails")
RecordActivityHeartbeat(ctx, "testDetails")
invoker.Close()
service.AssertExpectations(t)

// No HB timeout configured.
service2 := new(mocks.TChanWorkflowService)
invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, cancel, 0)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker2,
logger: getLogger()})
service2.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Return(&s.RecordActivityTaskHeartbeatResponse{}, nil).Once()
RecordActivityHeartbeat(ctx, "testDetails")
RecordActivityHeartbeat(ctx, "testDetails")
invoker2.Close()
service2.AssertExpectations(t)

// simulate batch picks before expiry.
waitCh := make(chan struct{})
service3 := new(mocks.TChanWorkflowService)
invoker3 := newServiceInvoker([]byte("task-token"), "identity", service3, cancel, 2)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker3,
logger: getLogger()})
service3.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Return(&s.RecordActivityTaskHeartbeatResponse{}, nil).Once()
service3.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Return(&s.RecordActivityTaskHeartbeatResponse{}, nil).Run(func(arg mock.Arguments) {
request := arg.Get(1).(*s.RecordActivityTaskHeartbeatRequest)
ev := EncodedValues(request.GetDetails())
var progress string
err := ev.Get(&progress)
if err != nil {
panic(err)
}
require.Equal(t, "testDetails-expected", progress)
waitCh <- struct{}{}
}).Once()

RecordActivityHeartbeat(ctx, "testDetails")
RecordActivityHeartbeat(ctx, "testDetails2")
RecordActivityHeartbeat(ctx, "testDetails3")
RecordActivityHeartbeat(ctx, "testDetails-expected")
<-waitCh
invoker3.Close()
service3.AssertExpectations(t)

// simulate batch picks before expiry, with out any progress specified.
waitCh2 := make(chan struct{})
service4 := new(mocks.TChanWorkflowService)
invoker4 := newServiceInvoker([]byte("task-token"), "identity", service4, cancel, 2)
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
serviceInvoker: invoker4,
logger: getLogger()})
service4.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Return(&s.RecordActivityTaskHeartbeatResponse{}, nil).Once()
service4.On("RecordActivityTaskHeartbeat", mock.Anything, mock.Anything).
Return(&s.RecordActivityTaskHeartbeatResponse{}, nil).Run(func(arg mock.Arguments) {
request := arg.Get(1).(*s.RecordActivityTaskHeartbeatRequest)
require.Nil(t, request.GetDetails())
waitCh2 <- struct{}{}
}).Once()

RecordActivityHeartbeat(ctx, nil)
RecordActivityHeartbeat(ctx, nil)
RecordActivityHeartbeat(ctx, nil)
RecordActivityHeartbeat(ctx, nil)
<-waitCh2
invoker4.Close()
service4.AssertExpectations(t)
}
108 changes: 95 additions & 13 deletions internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/uber-go/tally"
Expand All @@ -40,6 +41,10 @@ import (
"go.uber.org/zap"
)

const (
defaultHeartBeatIntervalInSec = 10 * 60
)

// interfaces
type (
// workflowExecutionEventHandler process a single event.
Expand Down Expand Up @@ -724,45 +729,121 @@ func newActivityTaskHandler(activities []activity,
}

type cadenceInvoker struct {
identity string
service m.TChanWorkflowService
taskToken []byte
cancelHandler func()
retryPolicy backoff.RetryPolicy
sync.Mutex
identity string
service m.TChanWorkflowService
taskToken []byte
cancelHandler func()
retryPolicy backoff.RetryPolicy
heartBeatTimeoutInSec int32 // The heart beat interval configured for this activity.
hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call.
lastDetailsToReport *[]byte
closeCh chan struct{}
}

func (i *cadenceInvoker) Heartbeat(details []byte) error {
i.Lock()
defer i.Unlock()

if i.hbBatchEndTimer != nil {
// If we have started batching window, keep track of last reported progress.
i.lastDetailsToReport = &details
return nil
}

isActivityCancelled, err := i.internalHeartBeat(details)

// If the activity is cancelled, the activity can ignore the cancellation and do its work
// and complete. Our cancellation is co-operative, so we will try to heartbeat.
if err == nil || isActivityCancelled {
// We have successfully sent heartbeat, start next batching window.
i.lastDetailsToReport = nil

// Create timer to fire before the threshold to report.
deadlineToTrigger := i.heartBeatTimeoutInSec
if deadlineToTrigger <= 0 {
// If we don't have any heartbeat timeout configured.
deadlineToTrigger = defaultHeartBeatIntervalInSec
}

// We set a deadline at 80% of the timeout.
duration := time.Duration(0.8*float32(deadlineToTrigger)) * time.Second
i.hbBatchEndTimer = time.NewTimer(duration)

go func() {
select {
case <-i.hbBatchEndTimer.C:
// We are close to deadline.
case <-i.closeCh:
// We got closed.
return
}

// We close the batch and report the progress.
var progressToReport *[]byte

i.Lock()
progressToReport = i.lastDetailsToReport
i.hbBatchEndTimer.Stop()
i.hbBatchEndTimer = nil
i.Unlock()

if progressToReport != nil {
i.Heartbeat(*progressToReport)
}
}()
}

return err
}

func (i *cadenceInvoker) internalHeartBeat(details []byte) (bool, error) {
isActivityCancelled := false
err := recordActivityHeartbeat(i.service, i.identity, i.taskToken, details, i.retryPolicy)

switch err.(type) {
case *CanceledError:
// We are asked to cancel. inform the activity about cancellation through context.
// We are asked to cancel. inform the activity about cancellation through context.
i.cancelHandler()
isActivityCancelled = true

case *s.EntityNotExistsError:
// We will pass these through as cancellation for now but something we can change
// later when we have setter on cancel handler.
i.cancelHandler()
isActivityCancelled = true
}

// We don't want to bubble temporary errors to the user.
// This error won't be return to user check RecordActivityHeartbeat().
return err
return isActivityCancelled, err
}

func (i *cadenceInvoker) Close() {
i.Lock()
defer i.Unlock()

close(i.closeCh)
if i.hbBatchEndTimer != nil {
i.hbBatchEndTimer.Stop()
}
}

func newServiceInvoker(
taskToken []byte,
identity string,
service m.TChanWorkflowService,
cancelHandler func(),
heartBeatTimeoutInSec int32,
) ServiceInvoker {
return &cadenceInvoker{
taskToken: taskToken,
identity: identity,
service: service,
cancelHandler: cancelHandler,
retryPolicy: serviceOperationRetryPolicy,
taskToken: taskToken,
identity: identity,
service: service,
cancelHandler: cancelHandler,
retryPolicy: serviceOperationRetryPolicy,
heartBeatTimeoutInSec: heartBeatTimeoutInSec,
closeCh: make(chan struct{}),
}
}

Expand All @@ -786,7 +867,8 @@ func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (r
rootCtx = context.Background()
}
canCtx, cancel := context.WithCancel(rootCtx)
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel)
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds())
defer invoker.Close()
ctx := WithActivityTask(canCtx, t, invoker, ath.logger)
activityType := *t.GetActivityType()
activityImplementation, ok := ath.implementations[flowActivityTypeFrom(activityType)]
Expand Down
9 changes: 5 additions & 4 deletions internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
nil,
"Test_Cadence_Invoker",
mockService,
func() {})
func() {},
0)

heartbeatErr := cadenceInvoker.Heartbeat(nil)
t.NotNil(heartbeatErr)
Expand Down Expand Up @@ -325,9 +326,9 @@ type deadlineTest struct {
}

var deadlineTests = []deadlineTest{
{[]activity{&testActivityDeadline{}}, time.Now(), 1, time.Now(), 1, nil},
{[]activity{&testActivityDeadline{}}, time.Now(), 2, time.Now(), 1, nil},
{[]activity{&testActivityDeadline{}}, time.Now(), 1, time.Now(), 2, nil},
{[]activity{&testActivityDeadline{}}, time.Now(), 3, time.Now(), 3, nil},
{[]activity{&testActivityDeadline{}}, time.Now(), 4, time.Now(), 3, nil},
{[]activity{&testActivityDeadline{}}, time.Now(), 3, time.Now(), 4, nil},
{[]activity{&testActivityDeadline{}}, time.Now().Add(-1 * time.Second), 1, time.Now(), 1, context.DeadlineExceeded},
{[]activity{&testActivityDeadline{}}, time.Now(), 1, time.Now().Add(-1 * time.Second), 1, context.DeadlineExceeded},
{[]activity{&testActivityDeadline{}}, time.Now().Add(-1 * time.Second), 1, time.Now().Add(-1 * time.Second), 1, context.DeadlineExceeded},
Expand Down

0 comments on commit 0d787b4

Please sign in to comment.