Skip to content

Commit

Permalink
Merge 95f4fd1 into 8e50080
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan committed May 1, 2019
2 parents 8e50080 + 95f4fd1 commit b66dbf4
Show file tree
Hide file tree
Showing 25 changed files with 2,615 additions and 84 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

1,747 changes: 1,717 additions & 30 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ ignored = ["go.uber.org/cadence/.tmp"]
[[constraint]]
name = "github.com/robfig/cron"
version = "1.1.0"

[[constraint]]
name = "github.com/uber/jaeger-client-go"
version = "2.15.0"
54 changes: 54 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ exception RetryTaskError {
5: optional i64 (js.type = "Long") nextEventId
}

exception ClientVersionNotSupportedError {
1: required string featureVersion
2: required string clientImpl
3: required string supportedVersions
}

enum WorkflowIdReusePolicy {
/*
* allow start a workflow execution using the same workflow ID,
Expand Down Expand Up @@ -327,6 +333,7 @@ struct ScheduleActivityTaskDecisionAttributes {
55: optional i32 startToCloseTimeoutSeconds
60: optional i32 heartbeatTimeoutSeconds
70: optional RetryPolicy retryPolicy
80: optional Header header
}

struct RequestCancelActivityTaskDecisionAttributes {
Expand Down Expand Up @@ -391,6 +398,7 @@ struct ContinueAsNewWorkflowExecutionDecisionAttributes {
100: optional binary failureDetails
110: optional binary lastCompletionResult
120: optional string cronSchedule
130: optional Header header
}

struct StartChildWorkflowExecutionDecisionAttributes {
Expand All @@ -406,6 +414,7 @@ struct StartChildWorkflowExecutionDecisionAttributes {
100: optional WorkflowIdReusePolicy workflowIdReusePolicy
110: optional RetryPolicy retryPolicy
120: optional string cronSchedule
130: optional Header header
}

struct Decision {
Expand Down Expand Up @@ -446,6 +455,21 @@ struct WorkflowExecutionStartedEventAttributes {
100: optional string cronSchedule
110: optional i32 firstDecisionTaskBackoffSeconds
120: optional Memo memo
130: optional ResetPoints prevAutoResetPoints
140: optional Header header
}

struct ResetPoints{
10: optional list<ResetPointInfo> points
}

struct ResetPointInfo{
10: optional string binaryChecksum
20: optional string runId
30: optional i64 firstDecisionCompletedId
40: optional i64 (js.type = "Long") createdTimeNano
50: optional i64 (js.type = "Long") expiringTimeNano //the time that the run is deleted due to retention
60: optional bool resettable // false if the resset point has pending childWFs/reqCancels/signalExternals.
}

struct WorkflowExecutionCompletedEventAttributes {
Expand Down Expand Up @@ -482,6 +506,7 @@ struct WorkflowExecutionContinuedAsNewEventAttributes {
100: optional string failureReason
110: optional binary failureDetails
120: optional binary lastCompletionResult
130: optional Header header
}

struct DecisionTaskScheduledEventAttributes {
Expand Down Expand Up @@ -535,6 +560,7 @@ struct ActivityTaskScheduledEventAttributes {
60: optional i32 heartbeatTimeoutSeconds
90: optional i64 (js.type = "Long") decisionTaskCompletedEventId
110: optional RetryPolicy retryPolicy
120: optional Header header
}

struct ActivityTaskStartedEventAttributes {
Expand Down Expand Up @@ -704,6 +730,7 @@ struct StartChildWorkflowExecutionInitiatedEventAttributes {
110: optional WorkflowIdReusePolicy workflowIdReusePolicy
120: optional RetryPolicy retryPolicy
130: optional string cronSchedule
140: optional Header header
}

struct StartChildWorkflowExecutionFailedEventAttributes {
Expand All @@ -721,6 +748,7 @@ struct ChildWorkflowExecutionStartedEventAttributes {
20: optional i64 (js.type = "Long") initiatedEventId
30: optional WorkflowExecution workflowExecution
40: optional WorkflowType workflowType
50: optional Header header
}

struct ChildWorkflowExecutionCompletedEventAttributes {
Expand Down Expand Up @@ -851,6 +879,17 @@ struct DomainConfiguration {
40: optional i32 archivalRetentionPeriodInDays
50: optional ArchivalStatus archivalStatus
60: optional string archivalBucketOwner
70: optional BadBinaries badBinaries
}

struct BadBinaries{
10: optional map<string, BadBinaryInfo> binaries
}

struct BadBinaryInfo{
10: optional string reason
20: optional string operator
30: optional i64 (js.type = "Long") createdTimeNano
}

struct UpdateDomainInfo {
Expand Down Expand Up @@ -913,6 +952,7 @@ struct UpdateDomainRequest {
30: optional DomainConfiguration configuration
40: optional DomainReplicationConfiguration replicationConfiguration
50: optional string securityToken
60: optional string deleteBadBinary
}

struct UpdateDomainResponse {
Expand Down Expand Up @@ -943,6 +983,7 @@ struct StartWorkflowExecutionRequest {
120: optional RetryPolicy retryPolicy
130: optional string cronSchedule
140: optional Memo memo
150: optional Header header
}

struct StartWorkflowExecutionResponse {
Expand Down Expand Up @@ -1019,6 +1060,7 @@ struct PollForActivityTaskResponse {
140: optional binary heartbeatDetails
150: optional WorkflowType workflowType
160: optional string workflowDomain
170: optional Header header
}

struct RecordActivityTaskHeartbeatRequest {
Expand Down Expand Up @@ -1187,6 +1229,18 @@ struct ListClosedWorkflowExecutionsResponse {
20: optional binary nextPageToken
}

struct ListWorkflowExecutionsRequest {
10: optional string domain
20: optional i32 pageSize
30: optional binary nextPageToken
40: optional string query
}

struct ListWorkflowExecutionsResponse {
10: optional list<WorkflowExecutionInfo> executions
20: optional binary nextPageToken
}

struct QueryWorkflowRequest {
10: optional string domain
20: optional WorkflowExecution execution
Expand Down
6 changes: 4 additions & 2 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func WithActivityTask(
scope tally.Scope,
dataConverter encoded.DataConverter,
workerStopChannel <-chan struct{},
contextPropagators []ContextPropagator,
) context.Context {
var deadline time.Time
scheduled := time.Unix(0, task.GetScheduledTimestamp())
Expand Down Expand Up @@ -310,8 +311,9 @@ func WithActivityTask(
workflowType: &WorkflowType{
Name: *task.WorkflowType.Name,
},
workflowDomain: *task.WorkflowDomain,
workerStopChannel: workerStopChannel,
workflowDomain: *task.WorkflowDomain,
workerStopChannel: workerStopChannel,
contextPropagators: contextPropagators,
})
}

Expand Down
22 changes: 14 additions & 8 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ type (

// ClientOptions are optional parameters for Client creation.
ClientOptions struct {
MetricsScope tally.Scope
Identity string
DataConverter encoded.DataConverter
MetricsScope tally.Scope
Identity string
DataConverter encoded.DataConverter
ContextPropagators []ContextPropagator
}

// StartWorkflowOptions configuration parameters for starting a workflow execution.
Expand Down Expand Up @@ -401,12 +402,17 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
} else {
dataConverter = getDefaultDataConverter()
}
var contextPropagators []ContextPropagator
if options != nil {
contextPropagators = options.ContextPropagators
}
return &workflowClient{
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
domain: domain,
metricsScope: metrics.NewTaggedScope(metricScope),
identity: identity,
dataConverter: dataConverter,
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
domain: domain,
metricsScope: metrics.NewTaggedScope(metricScope),
identity: identity,
dataConverter: dataConverter,
contextPropagators: contextPropagators,
}
}

Expand Down
19 changes: 19 additions & 0 deletions internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go"
"go.uber.org/cadence/.gen/go/shared"
)

const activeSpanKey contextKey = "activeSpanKey"

// Context is a clone of context.Context with Done() returning Channel instead
// of native channel.
// A Context carries a deadline, a cancellation signal, and other values across
Expand Down Expand Up @@ -161,6 +164,10 @@ var (
todo = new(emptyCtx)
)

func Background() Context {
return background
}

// ErrCanceled is the error returned by Context.Err when the context is canceled.
var ErrCanceled = NewCanceledError()

Expand Down Expand Up @@ -419,3 +426,15 @@ func (c *valueCtx) Value(key interface{}) interface{} {
}
return c.Context.Value(key)
}

func spanFromContext(ctx Context) opentracing.Span {
val := ctx.Value(activeSpanKey)
if sp, ok := val.(opentracing.Span); ok {
return sp
}
return nil
}

func contextWithSpan(ctx Context, span opentracing.Span) Context {
return WithValue(ctx, activeSpanKey, span)
}
Loading

0 comments on commit b66dbf4

Please sign in to comment.