Skip to content

Commit

Permalink
Merge branch 'master' into termifrun
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Apr 28, 2020
2 parents 7368508 + bd0817e commit c3a2573
Show file tree
Hide file tree
Showing 28 changed files with 732 additions and 433 deletions.
2 changes: 1 addition & 1 deletion LICENSE
@@ -1,4 +1,4 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down
64 changes: 42 additions & 22 deletions internal/activity.go
@@ -1,4 +1,6 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -57,7 +59,8 @@ type (

// RegisterActivityOptions consists of options for registering an activity
RegisterActivityOptions struct {
Name string
Name string
DisableAlreadyRegisteredCheck bool
}

// ActivityOptions stores all activity-specific parameters that will be stored inside of a context.
Expand Down Expand Up @@ -119,42 +122,59 @@ type (
}
)

// RegisterActivity - register a activity function with the framework.
// A activity takes a context and input and returns a (result, error) or just error.
// RegisterActivity - register an activity function or a pointer to a structure with the framework.
// The public form is: activity.Register(...)
// An activity function takes a context and input and returns a (result, error) or just error.
//
// And activity struct is a structure with all its exported methods treated as activities. The default
// name of each activity is the <structure name>_<method name>. Use RegisterActivityWithOptions to override the
// "<structure name>_" prefix.
//
// Examples:
// func sampleActivity(ctx context.Context, input []byte) (result []byte, err error)
// func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error)
// func sampleActivity(ctx context.Context) (err error)
// func sampleActivity() (result string, err error)
// func sampleActivity(arg1 bool) (result int, err error)
// func sampleActivity(arg1 bool) (err error)
//
// type Activities struct {
// // fields
// }
// func (a *Activities) SampleActivity1(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) {
// ...
// }
//
// func (a *Activities) SampleActivity2(ctx context.Context, arg1 int, arg2 *customerStruct) (result string, err error) {
// ...
// }
//
// Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer.
// This method calls panic if activityFunc doesn't comply with the expected format.
func RegisterActivity(activityFunc interface{}) {
RegisterActivityWithOptions(activityFunc, RegisterActivityOptions{})
}

// RegisterActivityWithOptions registers the activity function with options
// RegisterActivityWithOptions registers the activity function or struct pointer with options.
// The public form is: activity.RegisterWithOptions(...)
// The user can use options to provide an external name for the activity or leave it empty if no
// external name is required. This can be used as
// client.RegisterActivity(barActivity, RegisterActivityOptions{})
// client.RegisterActivity(barActivity, RegisterActivityOptions{Name: "barExternal"})
// An activity takes a context and input and returns a (result, error) or just error.
// Examples:
// func sampleActivity(ctx context.Context, input []byte) (result []byte, err error)
// func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error)
// func sampleActivity(ctx context.Context) (err error)
// func sampleActivity() (result string, err error)
// func sampleActivity(arg1 bool) (result int, err error)
// func sampleActivity(arg1 bool) (err error)
// Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer.
// This method calls panic if activityFunc doesn't comply with the expected format.
// activity.RegisterWithOptions(barActivity, RegisterActivityOptions{})
// activity.RegisterWithOptions(barActivity, RegisterActivityOptions{Name: "barExternal"})
// When registering the structure that implements activities the name is used as a prefix that is
// prepended to the activity method name.
// activity.RegisterWithOptions(&Activities{ ... }, RegisterActivityOptions{Name: "MyActivities_"})
// To override each name of activities defined through a structure register the methods one by one:
// activities := &Activities{ ... }
// activity.RegisterWithOptions(activities.SampleActivity1, RegisterActivityOptions{Name: "Sample1"})
// activity.RegisterWithOptions(activities.SampleActivity2, RegisterActivityOptions{Name: "Sample2"})
// See RegisterActivity function for more info.
// The other use of options is to disable duplicated activity registration check
// which might be useful for integration tests.
// activity.RegisterWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true})
func RegisterActivityWithOptions(activityFunc interface{}, opts RegisterActivityOptions) {
thImpl := getHostEnvironment()
err := thImpl.RegisterActivityWithOptions(activityFunc, opts)
if err != nil {
panic(err)
}
registry := getGlobalRegistry()
registry.RegisterActivityWithOptions(activityFunc, opts)
}

// GetActivityInfo returns information about currently executing activity.
Expand Down
6 changes: 4 additions & 2 deletions internal/client.go
@@ -1,4 +1,5 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -183,7 +184,7 @@ type (
CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error

// CompleteActivityById reports activity completed.
// Similar to CompleteActivity, but may save cadence user from keeping taskToken info.
// Similar to CompleteActivity, but may save user from keeping taskToken info.
// activity Execute method can return activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivityById() method
// should be called when that activity is completed with the actual result and error. If err is nil, activity task
Expand Down Expand Up @@ -518,6 +519,7 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
return &workflowClient{
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
domain: domain,
registry: newRegistry(getGlobalRegistry()),
metricsScope: metrics.NewTaggedScope(metricScope),
identity: identity,
dataConverter: dataConverter,
Expand Down
6 changes: 4 additions & 2 deletions internal/error.go
@@ -1,4 +1,5 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -219,7 +220,8 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *C
if options == nil {
panic("context is missing required options for continue as new")
}
workflowType, input, err := getValidatedWorkflowFunction(wfn, args, options.dataConverter)
env := getWorkflowEnvironment(ctx)
workflowType, input, err := getValidatedWorkflowFunction(wfn, args, options.dataConverter, env.GetRegistry())
if err != nil {
panic(err)
}
Expand Down
15 changes: 8 additions & 7 deletions internal/internal_activity.go
@@ -1,4 +1,5 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -282,7 +283,7 @@ func validateFunctionArgs(f interface{}, args []interface{}, isWorkflow bool) er
return nil
}

func getValidatedActivityFunction(f interface{}, args []interface{}, dataConverter DataConverter) (*ActivityType, []byte, error) {
func getValidatedActivityFunction(f interface{}, args []interface{}, dataConverter DataConverter, registry *registry) (*ActivityType, []byte, error) {
fnName := ""
fType := reflect.TypeOf(f)
switch getKind(fType) {
Expand All @@ -294,13 +295,13 @@ func getValidatedActivityFunction(f interface{}, args []interface{}, dataConvert
return nil, nil, err
}
fnName = getFunctionName(f)
if alias, ok := getHostEnvironment().getActivityAlias(fnName); ok {
if alias, ok := registry.getActivityAlias(fnName); ok {
fnName = alias
}

default:
return nil, nil, fmt.Errorf(
"Invalid type 'f' parameter provided, it can be either activity function or name of the activity: %v", f)
"invalid type 'f' parameter provided, it can be either activity function or name of the activity: %v", f)
}

input, err := encodeArgs(dataConverter, args)
Expand Down Expand Up @@ -328,7 +329,7 @@ func validateFunctionAndGetResults(f interface{}, values []reflect.Value, dataCo

if resultSize < 1 || resultSize > 2 {
return nil, fmt.Errorf(
"The function: %v signature returns %d results, it is expecting to return either error or (result, error)",
"the function: %v signature returns %d results, it is expecting to return either error or (result, error)",
fnName, resultSize)
}

Expand Down Expand Up @@ -380,7 +381,7 @@ func deSerializeFnResultFromFnType(fnType reflect.Type, result []byte, to interf
return nil
}

func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dataConverter DataConverter) error {
func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dataConverter DataConverter, registry *registry) error {
fType := reflect.TypeOf(f)
if dataConverter == nil {
dataConverter = getDefaultDataConverter()
Expand All @@ -394,7 +395,7 @@ func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dat
case reflect.String:
// If we know about this function through registration then we will try to return corresponding result type.
fnName := reflect.ValueOf(f).String()
if fnRegistered, ok := getHostEnvironment().getActivityFn(fnName); ok {
if fnRegistered, ok := registry.getActivityFn(fnName); ok {
return deSerializeFnResultFromFnType(reflect.TypeOf(fnRegistered), result, to, dataConverter)
}
}
Expand Down
15 changes: 10 additions & 5 deletions internal/internal_event_handlers.go
@@ -1,4 +1,5 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -112,7 +113,7 @@ type (
enableLoggingInReplay bool // flag to indicate if workflow should enable logging in replay mode

metricsScope tally.Scope
hostEnv *hostEnvImpl
registry *registry
dataConverter DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
Expand Down Expand Up @@ -175,7 +176,7 @@ func newWorkflowExecutionEventHandler(
logger *zap.Logger,
enableLoggingInReplay bool,
scope tally.Scope,
hostEnv *hostEnvImpl,
registry *registry,
dataConverter DataConverter,
contextPropagators []ContextPropagator,
tracer opentracing.Tracer,
Expand All @@ -191,7 +192,7 @@ func newWorkflowExecutionEventHandler(
openSessions: make(map[string]*SessionInfo),
completeHandler: completeHandler,
enableLoggingInReplay: enableLoggingInReplay,
hostEnv: hostEnv,
registry: registry,
dataConverter: dataConverter,
contextPropagators: contextPropagators,
tracer: tracer,
Expand Down Expand Up @@ -734,6 +735,10 @@ func (wc *workflowEnvironmentImpl) getOpenSessions() []*SessionInfo {
return openSessions
}

func (wc *workflowEnvironmentImpl) GetRegistry() *registry {
return wc.registry
}

func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
event *m.HistoryEvent,
isReplay bool,
Expand Down Expand Up @@ -944,7 +949,7 @@ func (weh *workflowExecutionEventHandlerImpl) Close() {

func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted(
attributes *m.WorkflowExecutionStartedEventAttributes) (err error) {
weh.workflowDefinition, err = weh.hostEnv.getWorkflowDefinition(
weh.workflowDefinition, err = weh.registry.getWorkflowDefinition(
weh.workflowInfo.WorkflowType,
)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/internal_pressure_points.go
@@ -1,4 +1,5 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -61,14 +62,14 @@ func newWorkflowWorkerWithPressurePoints(
domain string,
params workerExecutionParameters,
pressurePoints map[string]map[string]string,
hostEnv *hostEnvImpl,
) (worker Worker) {
registry *registry,
) (worker *workflowWorker) {
return newWorkflowWorker(
service,
domain,
params,
&pressurePointMgrImpl{config: pressurePoints, logger: params.Logger},
hostEnv,
registry,
)
}

Expand Down
25 changes: 13 additions & 12 deletions internal/internal_task_handlers.go
@@ -1,4 +1,5 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -118,7 +119,7 @@ type (
identity string
enableLoggingInReplay bool
disableStickyExecution bool
hostEnv *hostEnvImpl
registry *registry
laTunnel *localActivityTunnel
nonDeterministicWorkflowPolicy NonDeterministicWorkflowPolicy
dataConverter DataConverter
Expand All @@ -136,7 +137,7 @@ type (
metricsScope *metrics.TaggedScope
logger *zap.Logger
userContext context.Context
hostEnv *hostEnvImpl
registry *registry
activityProvider activityProvider
dataConverter DataConverter
workerStopCh <-chan struct{}
Expand Down Expand Up @@ -369,7 +370,7 @@ func newWorkflowTaskHandler(
domain string,
params workerExecutionParameters,
ppMgr pressurePointMgr,
hostEnv *hostEnvImpl,
registry *registry,
) WorkflowTaskHandler {
ensureRequiredParams(&params)
return &workflowTaskHandlerImpl{
Expand All @@ -380,7 +381,7 @@ func newWorkflowTaskHandler(
identity: params.Identity,
enableLoggingInReplay: params.EnableLoggingInReplay,
disableStickyExecution: params.DisableStickyExecution,
hostEnv: hostEnv,
registry: registry,
nonDeterministicWorkflowPolicy: params.NonDeterministicWorkflowPolicy,
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
Expand Down Expand Up @@ -567,7 +568,7 @@ func (w *workflowExecutionContextImpl) createEventHandler() {
w.wth.logger,
w.wth.enableLoggingInReplay,
w.wth.metricsScope,
w.wth.hostEnv,
w.wth.registry,
w.wth.dataConverter,
w.wth.contextPropagators,
w.wth.tracer,
Expand Down Expand Up @@ -1580,15 +1581,15 @@ func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *s.HistoryEve
func newActivityTaskHandler(
service workflowserviceclient.Interface,
params workerExecutionParameters,
env *hostEnvImpl,
registry *registry,
) ActivityTaskHandler {
return newActivityTaskHandlerWithCustomProvider(service, params, env, nil)
return newActivityTaskHandlerWithCustomProvider(service, params, registry, nil)
}

func newActivityTaskHandlerWithCustomProvider(
service workflowserviceclient.Interface,
params workerExecutionParameters,
env *hostEnvImpl,
registry *registry,
activityProvider activityProvider,
) ActivityTaskHandler {
return &activityTaskHandlerImpl{
Expand All @@ -1598,7 +1599,7 @@ func newActivityTaskHandlerWithCustomProvider(
logger: params.Logger,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
userContext: params.UserContext,
hostEnv: env,
registry: registry,
activityProvider: activityProvider,
dataConverter: params.DataConverter,
workerStopCh: params.WorkerStopChannel,
Expand Down Expand Up @@ -1828,15 +1829,15 @@ func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
return ath.activityProvider(name)
}

if a, ok := ath.hostEnv.getActivity(name); ok {
if a, ok := ath.registry.getActivity(name); ok {
return a
}

return nil
}

func (ath *activityTaskHandlerImpl) getRegisteredActivityNames() (activityNames []string) {
for _, a := range ath.hostEnv.getRegisteredActivities() {
for _, a := range ath.registry.getRegisteredActivities() {
activityNames = append(activityNames, a.ActivityType().Name)
}
return
Expand Down

0 comments on commit c3a2573

Please sign in to comment.