Skip to content

Commit

Permalink
Revert "Merge workflow interceptor changes"
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 committed Jun 2, 2020
1 parent 41c7b31 commit 1ff0f73
Show file tree
Hide file tree
Showing 38 changed files with 821 additions and 1,883 deletions.
6 changes: 1 addition & 5 deletions activity/activity.go
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -46,15 +46,11 @@ type (
// that could report the activity completed event to cadence server via Client.CompleteActivity() API.
var ErrResultPending = internal.ErrActivityResultPending

// Deprecated: Global activity registration methods are replaced by equivalent Worker instance methods.
// This method is kept to maintain backward compatibility and should not be used.
// Register - calls RegisterWithOptions with default registration options.
func Register(activityFunc interface{}) {
internal.RegisterActivity(activityFunc)
}

// Deprecated: Global activity registration methods are replaced by equivalent Worker instance methods.
// This method is kept to maintain backward compatibility and should not be used.
// RegisterWithOptions registers the activity function with options
// The user can use options to provide an external name for the activity or leave it empty if no
// external name is required. This can be used as
Expand Down
47 changes: 19 additions & 28 deletions evictiontest/workflow_cache_eviction_test.go
@@ -1,5 +1,4 @@
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,9 +29,7 @@
package evictiontest

import (
"strconv"
"testing"
"time"

"github.com/golang/mock/gomock"
log "github.com/sirupsen/logrus"
Expand All @@ -45,8 +42,21 @@ import (
"go.uber.org/cadence/worker"
"go.uber.org/yarpc"
"golang.org/x/net/context"
"strconv"
"time"
)

func init() {
// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
// won't work because the workflow will simply just complete
// and won't stay in the cache.
// for this test, we need a workflow that "blocks" either by
// running an activity or waiting on a timer so that its execution
// context sticks around in the cache.
internal.RegisterWorkflow(testReplayWorkflow)
}

func testReplayWorkflow(ctx internal.Context) error {
ao := internal.ActivityOptions{
ScheduleToStartTimeout: time.Second,
Expand Down Expand Up @@ -90,19 +100,14 @@ func TestWorkersTestSuite(t *testing.T) {
var callOptions = []interface{}{gomock.Any(), gomock.Any(), gomock.Any()}

func createTestEventWorkflowExecutionStarted(eventID int64, attr *m.WorkflowExecutionStartedEventAttributes) *m.HistoryEvent {
return &m.HistoryEvent{
EventId: common.Int64Ptr(eventID),
EventType: common.EventTypePtr(m.EventTypeWorkflowExecutionStarted),
WorkflowExecutionStartedEventAttributes: attr,
}
return &m.HistoryEvent{EventId: common.Int64Ptr(eventID), EventType: common.EventTypePtr(m.EventTypeWorkflowExecutionStarted), WorkflowExecutionStartedEventAttributes: attr}
}

func createTestEventDecisionTaskScheduled(eventID int64, attr *m.DecisionTaskScheduledEventAttributes) *m.HistoryEvent {
return &m.HistoryEvent{
EventId: common.Int64Ptr(eventID),
EventType: common.EventTypePtr(m.EventTypeDecisionTaskScheduled),
DecisionTaskScheduledEventAttributes: attr,
}
DecisionTaskScheduledEventAttributes: attr}
}

func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
Expand All @@ -115,10 +120,7 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {

var taskCounter atomic.Int32 // lambda variable to keep count
// mock that manufactures unique decision tasks
mockPollForDecisionTask := func(
ctx context.Context,
_PollRequest *m.PollForDecisionTaskRequest,
opts ...yarpc.CallOption,
mockPollForDecisionTask := func(ctx context.Context, _PollRequest *m.PollForDecisionTaskRequest, opts ...yarpc.CallOption,
) (success *m.PollForDecisionTaskResponse, err error) {
taskID := taskCounter.Inc()
workflowID := common.StringPtr("testID" + strconv.Itoa(int(taskID)))
Expand All @@ -130,17 +132,14 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
ret := &m.PollForDecisionTaskResponse{
TaskToken: make([]byte, 5),
WorkflowExecution: &m.WorkflowExecution{WorkflowId: workflowID, RunId: runID},
WorkflowType: &m.WorkflowType{Name: common.StringPtr("testReplayWorkflow")},
WorkflowType: &m.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/evictiontest.testReplayWorkflow")},
History: &m.History{Events: testEvents},
PreviousStartedEventId: common.Int64Ptr(5)}
return ret, nil
}

resetStickyAPICalled := make(chan struct{})
mockResetStickyTaskList := func(
ctx context.Context,
_ResetRequest *m.ResetStickyTaskListRequest,
opts ...yarpc.CallOption,
mockResetStickyTaskList := func(ctx context.Context, _ResetRequest *m.ResetStickyTaskListRequest, opts ...yarpc.CallOption,
) (success *m.ResetStickyTaskListResponse, err error) {
resetStickyAPICalled <- struct{}{}
return &m.ResetStickyTaskListResponse{}, nil
Expand Down Expand Up @@ -168,14 +167,6 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions...).DoAndReturn(mockResetStickyTaskList).Times(1)

workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{DisableActivityWorker: true})
// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
// won't work because the workflow will simply just complete
// and won't stay in the cache.
// for this test, we need a workflow that "blocks" either by
// running an activity or waiting on a timer so that its execution
// context sticks around in the cache.
workflowWorker.RegisterWorkflow(testReplayWorkflow)

workflowWorker.Start()

Expand Down
45 changes: 0 additions & 45 deletions interceptors/workflow_interceptor.go

This file was deleted.

111 changes: 107 additions & 4 deletions internal/activity.go
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/backoff"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -120,8 +122,6 @@ type (
}
)

// Deprecated: Global activity registration methods are replaced by equivalent Worker instance methods.
// This method is kept to maintain backward compatibility and should not be used.
// RegisterActivity - register an activity function or a pointer to a structure with the framework.
// The public form is: activity.Register(...)
// An activity function takes a context and input and returns a (result, error) or just error.
Expand Down Expand Up @@ -155,8 +155,6 @@ func RegisterActivity(activityFunc interface{}) {
RegisterActivityWithOptions(activityFunc, RegisterActivityOptions{})
}

// Deprecated: Global activity registration methods are replaced by equivalent Worker instance methods.
// This method is kept to maintain backward compatibility and should not be used.
// RegisterActivityWithOptions registers the activity function or struct pointer with options.
// The public form is: activity.RegisterWithOptions(...)
// The user can use options to provide an external name for the activity or leave it empty if no
Expand Down Expand Up @@ -342,3 +340,108 @@ func WithActivityTask(
tracer: tracer,
})
}

// WithActivityOptions adds all options to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func WithActivityOptions(ctx Context, options ActivityOptions) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
eap := getActivityOptions(ctx1)

eap.TaskListName = options.TaskList
eap.ScheduleToCloseTimeoutSeconds = common.Int32Ceil(options.ScheduleToCloseTimeout.Seconds())
eap.StartToCloseTimeoutSeconds = common.Int32Ceil(options.StartToCloseTimeout.Seconds())
eap.ScheduleToStartTimeoutSeconds = common.Int32Ceil(options.ScheduleToStartTimeout.Seconds())
eap.HeartbeatTimeoutSeconds = common.Int32Ceil(options.HeartbeatTimeout.Seconds())
eap.WaitForCancellation = options.WaitForCancellation
eap.ActivityID = common.StringPtr(options.ActivityID)
eap.RetryPolicy = convertRetryPolicy(options.RetryPolicy)
return ctx1
}

// WithLocalActivityOptions adds local activity options to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context {
ctx1 := setLocalActivityParametersIfNotExist(ctx)
opts := getLocalActivityOptions(ctx1)

opts.ScheduleToCloseTimeoutSeconds = common.Int32Ceil(options.ScheduleToCloseTimeout.Seconds())
opts.RetryPolicy = options.RetryPolicy
return ctx1
}

// WithTaskList adds a task list to the copy of the context.
func WithTaskList(ctx Context, name string) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).TaskListName = name
return ctx1
}

// WithScheduleToCloseTimeout adds a timeout to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func WithScheduleToCloseTimeout(ctx Context, d time.Duration) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).ScheduleToCloseTimeoutSeconds = common.Int32Ceil(d.Seconds())
return ctx1
}

// WithScheduleToStartTimeout adds a timeout to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func WithScheduleToStartTimeout(ctx Context, d time.Duration) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).ScheduleToStartTimeoutSeconds = common.Int32Ceil(d.Seconds())
return ctx1
}

// WithStartToCloseTimeout adds a timeout to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func WithStartToCloseTimeout(ctx Context, d time.Duration) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).StartToCloseTimeoutSeconds = common.Int32Ceil(d.Seconds())
return ctx1
}

// WithHeartbeatTimeout adds a timeout to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
func WithHeartbeatTimeout(ctx Context, d time.Duration) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).HeartbeatTimeoutSeconds = common.Int32Ceil(d.Seconds())
return ctx1
}

// WithWaitForCancellation adds wait for the cacellation to the copy of the context.
func WithWaitForCancellation(ctx Context, wait bool) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).WaitForCancellation = wait
return ctx1
}

// WithRetryPolicy adds retry policy to the copy of the context
func WithRetryPolicy(ctx Context, retryPolicy RetryPolicy) Context {
ctx1 := setActivityParametersIfNotExist(ctx)
getActivityOptions(ctx1).RetryPolicy = convertRetryPolicy(&retryPolicy)
return ctx1
}

func convertRetryPolicy(retryPolicy *RetryPolicy) *shared.RetryPolicy {
if retryPolicy == nil {
return nil
}
if retryPolicy.BackoffCoefficient == 0 {
retryPolicy.BackoffCoefficient = backoff.DefaultBackoffCoefficient
}
thriftRetryPolicy := shared.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.InitialInterval.Seconds())),
MaximumIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.MaximumInterval.Seconds())),
BackoffCoefficient: &retryPolicy.BackoffCoefficient,
MaximumAttempts: &retryPolicy.MaximumAttempts,
NonRetriableErrorReasons: retryPolicy.NonRetriableErrorReasons,
ExpirationIntervalInSeconds: common.Int32Ptr(common.Int32Ceil(retryPolicy.ExpirationInterval.Seconds())),
}
return &thriftRetryPolicy
}
2 changes: 1 addition & 1 deletion internal/client.go
Expand Up @@ -519,7 +519,7 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
return &workflowClient{
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
domain: domain,
registry: newRegistry(),
registry: newRegistry(getGlobalRegistry()),
metricsScope: metrics.NewTaggedScope(metricScope),
identity: identity,
dataConverter: dataConverter,
Expand Down

0 comments on commit 1ff0f73

Please sign in to comment.