/
api.go
553 lines (509 loc) · 20.2 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package startworkflow
import (
"context"
"errors"
"github.com/google/uuid"
commonpb "go.temporal.io/api/common/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
type eagerStartDeniedReason metrics.ReasonString
const (
eagerStartDeniedReasonDynamicConfigDisabled eagerStartDeniedReason = "dynamic_config_disabled"
eagerStartDeniedReasonFirstWorkflowTaskBackoff eagerStartDeniedReason = "first_workflow_task_backoff"
eagerStartDeniedReasonTaskAlreadyDispatched eagerStartDeniedReason = "task_already_dispatched"
)
// Starter starts a new workflow execution.
type Starter struct {
shardCtx shard.Context
workflowConsistencyChecker api.WorkflowConsistencyChecker
tokenSerializer common.TaskTokenSerializer
request *historyservice.StartWorkflowExecutionRequest
namespace *namespace.Namespace
}
// creationParams is a container for all information obtained from creating the uncommitted execution.
// The information is later used to create a new execution and handle conflicts.
type creationParams struct {
workflowID string
runID string
workflowContext api.WorkflowContext
workflowTaskInfo *workflow.WorkflowTaskInfo
workflowSnapshot *persistence.WorkflowSnapshot
workflowEventBatches []*persistence.WorkflowEvents
}
// mutableStateInfo is a container for the relevant mutable state information to generate a start response with an eager
// workflow task.
type mutableStateInfo struct {
branchToken []byte
lastEventID int64
workflowTaskInfo *workflow.WorkflowTaskInfo
hasInflight bool
}
// NewStarter creates a new starter, fails if getting the active namespace fails.
func NewStarter(
shardCtx shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
tokenSerializer common.TaskTokenSerializer,
request *historyservice.StartWorkflowExecutionRequest,
) (*Starter, error) {
namespaceEntry, err := api.GetActiveNamespace(shardCtx, namespace.ID(request.GetNamespaceId()))
if err != nil {
return nil, err
}
return &Starter{
shardCtx: shardCtx,
workflowConsistencyChecker: workflowConsistencyChecker,
tokenSerializer: tokenSerializer,
request: request,
namespace: namespaceEntry,
}, nil
}
// prepare applies request overrides, validates the request, and records eager execution metrics.
func (s *Starter) prepare(ctx context.Context) error {
request := s.request.StartRequest
metricsHandler := s.shardCtx.GetMetricsHandler()
api.OverrideStartWorkflowExecutionRequest(request, metrics.HistoryStartWorkflowExecutionScope, s.shardCtx, metricsHandler)
err := api.ValidateStartWorkflowExecutionRequest(ctx, request, s.shardCtx, s.namespace, "StartWorkflowExecution")
if err != nil {
return err
}
if request.RequestEagerExecution {
metricsHandler.Counter(metrics.WorkflowEagerExecutionCounter.GetMetricName()).Record(
1,
metrics.NamespaceTag(s.namespace.Name().String()),
metrics.TaskQueueTag(request.TaskQueue.Name),
metrics.WorkflowTypeTag(request.WorkflowType.Name),
)
}
// Override to false to avoid having to look up the dynamic config throughout the diffrent code paths.
if !s.shardCtx.GetConfig().EnableEagerWorkflowStart(s.namespace.Name().String()) {
s.recordEagerDenied(eagerStartDeniedReasonDynamicConfigDisabled)
request.RequestEagerExecution = false
}
if s.request.FirstWorkflowTaskBackoff != nil && *s.request.FirstWorkflowTaskBackoff > 0 {
s.recordEagerDenied(eagerStartDeniedReasonFirstWorkflowTaskBackoff)
request.RequestEagerExecution = false
}
return nil
}
func (s *Starter) recordEagerDenied(reason eagerStartDeniedReason) {
metricsHandler := s.shardCtx.GetMetricsHandler()
metricsHandler.Counter(metrics.WorkflowEagerExecutionDeniedCounter.GetMetricName()).Record(
1,
metrics.NamespaceTag(s.namespace.Name().String()),
metrics.TaskQueueTag(s.request.StartRequest.TaskQueue.Name),
metrics.WorkflowTypeTag(s.request.StartRequest.WorkflowType.Name),
metrics.ReasonTag(metrics.ReasonString(reason)),
)
}
func (s *Starter) requestEagerStart() bool {
return s.request.StartRequest.GetRequestEagerExecution()
}
// Invoke starts a new workflow execution
func (s *Starter) Invoke(
ctx context.Context,
) (resp *historyservice.StartWorkflowExecutionResponse, retError error) {
request := s.request.StartRequest
if err := s.prepare(ctx); err != nil {
return nil, err
}
runID := uuid.NewString()
creationParams, err := s.createNewMutableState(ctx, request.GetWorkflowId(), runID)
if err != nil {
return nil, err
}
err = s.createBrandNew(ctx, creationParams)
if err == nil {
return s.generateResponse(creationParams.runID, creationParams.workflowTaskInfo, extractHistoryEvents(creationParams.workflowEventBatches))
}
var currentWorkflowConditionFailedError *persistence.CurrentWorkflowConditionFailedError
if !errors.As(err, ¤tWorkflowConditionFailedError) {
return nil, err
}
// The history and mutable state we generated above should be deleted by a background process.
return s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError)
}
// createNewMutableState creates a new workflow context, and closes its mutable state transaction as snapshot.
// It returns the creationContext which can later be used to insert into the executions table.
func (s *Starter) createNewMutableState(ctx context.Context, workflowID string, runID string) (*creationParams, error) {
workflowContext, err := api.NewWorkflowWithSignal(
ctx,
s.shardCtx,
s.namespace,
workflowID,
runID,
s.request,
nil,
)
if err != nil {
return nil, err
}
now := s.shardCtx.GetTimeSource().Now()
mutableState := workflowContext.GetMutableState()
workflowTaskInfo, hasInflight := mutableState.GetInFlightWorkflowTask()
if s.requestEagerStart() && !hasInflight {
return nil, serviceerror.NewInternal("unexpected error: mutable state did not have an inflight workflow task")
}
workflowSnapshot, eventBatches, err := mutableState.CloseTransactionAsSnapshot(
now,
workflow.TransactionPolicyActive,
)
if err != nil {
return nil, err
}
if len(eventBatches) != 1 {
return nil, serviceerror.NewInternal("unable to create 1st event batch")
}
return &creationParams{
workflowID: workflowID,
runID: runID,
workflowContext: workflowContext,
workflowTaskInfo: workflowTaskInfo,
workflowSnapshot: workflowSnapshot,
workflowEventBatches: eventBatches,
}, nil
}
// createBrandNew creates a new "brand new" execution in the executions table.
func (s *Starter) createBrandNew(ctx context.Context, creationParams *creationParams) error {
now := s.shardCtx.GetTimeSource().Now()
return creationParams.workflowContext.GetContext().CreateWorkflowExecution(
ctx,
now,
persistence.CreateWorkflowModeBrandNew,
"", // prevRunID
0, // prevLastWriteVersion
creationParams.workflowContext.GetMutableState(),
creationParams.workflowSnapshot,
creationParams.workflowEventBatches,
)
}
// handleConflict handles CurrentWorkflowConditionFailedError where there's a workflow with the same workflowID.
// This may happen either when the currently handled request is a retry of a previous attempt (identified by the
// RequestID) or simply because a different run exists for the same workflow.
func (s *Starter) handleConflict(
ctx context.Context,
creationParams *creationParams,
currentWorkflowConditionFailed *persistence.CurrentWorkflowConditionFailedError,
) (*historyservice.StartWorkflowExecutionResponse, error) {
request := s.request.StartRequest
if currentWorkflowConditionFailed.RequestID == request.GetRequestId() {
return s.respondToRetriedRequest(ctx, currentWorkflowConditionFailed.RunID)
}
if err := s.verifyNamespaceActive(creationParams, currentWorkflowConditionFailed); err != nil {
return nil, err
}
response, err := s.applyWorkflowIDReusePolicy(ctx, currentWorkflowConditionFailed, creationParams)
if err != nil {
return nil, err
} else if response != nil {
return response, nil
}
if err := s.createAsCurrent(ctx, creationParams, currentWorkflowConditionFailed); err != nil {
return nil, err
}
return s.generateResponse(creationParams.runID, creationParams.workflowTaskInfo, extractHistoryEvents(creationParams.workflowEventBatches))
}
// createAsCurrent creates a new workflow execution and sets it to "current".
func (s *Starter) createAsCurrent(
ctx context.Context,
creationParams *creationParams,
currentWorkflowConditionFailed *persistence.CurrentWorkflowConditionFailedError,
) error {
now := s.shardCtx.GetTimeSource().Now()
return creationParams.workflowContext.GetContext().CreateWorkflowExecution(
ctx,
now,
persistence.CreateWorkflowModeUpdateCurrent,
currentWorkflowConditionFailed.RunID,
currentWorkflowConditionFailed.LastWriteVersion,
creationParams.workflowContext.GetMutableState(),
creationParams.workflowSnapshot,
creationParams.workflowEventBatches,
)
}
func (s *Starter) verifyNamespaceActive(creationParams *creationParams, currentWorkflowConditionFailed *persistence.CurrentWorkflowConditionFailedError) error {
if creationParams.workflowContext.GetMutableState().GetCurrentVersion() < currentWorkflowConditionFailed.LastWriteVersion {
clusterMetadata := s.shardCtx.GetClusterMetadata()
clusterName := clusterMetadata.ClusterNameForFailoverVersion(s.namespace.IsGlobalNamespace(), currentWorkflowConditionFailed.LastWriteVersion)
return serviceerror.NewNamespaceNotActive(
s.namespace.Name().String(),
clusterMetadata.GetCurrentClusterName(),
clusterName,
)
}
return nil
}
// applyWorkflowIDReusePolicy applies the workflow ID reuse policy in case a workflow start requests fails with a
// duplicate execution.
// At the time of this writing, the only possible action here is to terminate the current execution in case the start
// request's ID reuse policy is TERMINATE_IF_RUNNING.
// Returns non-nil response if an action was required and completed successfully resulting in a newly created execution.
func (s *Starter) applyWorkflowIDReusePolicy(
ctx context.Context,
currentWorkflowConditionFailed *persistence.CurrentWorkflowConditionFailedError,
creationParams *creationParams,
) (*historyservice.StartWorkflowExecutionResponse, error) {
workflowID := s.request.StartRequest.WorkflowId
prevExecutionUpdateAction, err := api.ApplyWorkflowIDReusePolicy(
currentWorkflowConditionFailed.RequestID,
currentWorkflowConditionFailed.RunID,
currentWorkflowConditionFailed.State,
currentWorkflowConditionFailed.Status,
workflowID,
creationParams.runID,
s.request.StartRequest.GetWorkflowIdReusePolicy(),
)
if err != nil {
return nil, err
}
if prevExecutionUpdateAction == nil {
return nil, nil
}
var mutableStateInfo *mutableStateInfo
// update prev execution and create new execution in one transaction
err = api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
s.namespace.ID().String(),
workflowID,
currentWorkflowConditionFailed.RunID,
),
prevExecutionUpdateAction,
func() (workflow.Context, workflow.MutableState, error) {
workflowContext, err := api.NewWorkflowWithSignal(
ctx,
s.shardCtx,
s.namespace,
workflowID,
creationParams.runID,
s.request,
nil)
if err != nil {
return nil, nil, err
}
mutableState := workflowContext.GetMutableState()
mutableStateInfo, err = extractMutableStateInfo(mutableState)
if err != nil {
return nil, nil, err
}
return workflowContext.GetContext(), mutableState, nil
},
s.shardCtx,
s.workflowConsistencyChecker,
)
switch err {
case nil:
if !s.requestEagerStart() {
return &historyservice.StartWorkflowExecutionResponse{
RunId: creationParams.runID,
}, nil
}
events, err := s.getWorkflowHistory(ctx, mutableStateInfo)
if err != nil {
return nil, err
}
return s.generateResponse(creationParams.runID, mutableStateInfo.workflowTaskInfo, events)
case consts.ErrWorkflowCompleted:
// previous workflow already closed
// fallthough to the logic for only creating the new workflow below
return nil, nil
default:
return nil, err
}
}
// respondToRetriedRequest provides a response in case a start request is retried.
func (s *Starter) respondToRetriedRequest(
ctx context.Context,
runID string,
) (*historyservice.StartWorkflowExecutionResponse, error) {
if !s.requestEagerStart() {
return &historyservice.StartWorkflowExecutionResponse{
RunId: runID,
}, nil
}
// For eager workflow execution, we need to get the task info and history events in order to construct a poll response.
mutableStateInfo, err := s.getMutableStateInfo(ctx, runID)
if err != nil {
return nil, err
}
// The current workflow task is not inflight or not the first task or we exceeded the first attempt and fell back to
// matching based dispatch.
if !mutableStateInfo.hasInflight || mutableStateInfo.workflowTaskInfo.StartedEventID != 3 || mutableStateInfo.workflowTaskInfo.Attempt > 1 {
s.recordEagerDenied(eagerStartDeniedReasonTaskAlreadyDispatched)
return &historyservice.StartWorkflowExecutionResponse{
RunId: runID,
}, nil
}
events, err := s.getWorkflowHistory(ctx, mutableStateInfo)
if err != nil {
return nil, err
}
return s.generateResponse(runID, mutableStateInfo.workflowTaskInfo, events)
}
// getMutableStateInfo gets the relevant mutable state information while getting the state for the given run from the
// workflow cache and managing the cache lease.
func (s *Starter) getMutableStateInfo(ctx context.Context, runID string) (*mutableStateInfo, error) {
// We techincally never want to create a new execution but in practice this should not happen.
workflowContext, releaseFn, err := s.workflowConsistencyChecker.GetWorkflowCache().GetOrCreateWorkflowExecution(
ctx,
s.namespace.ID(),
commonpb.WorkflowExecution{WorkflowId: s.request.StartRequest.WorkflowId, RunId: runID},
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
}
var releaseErr error
defer func() {
releaseFn(releaseErr)
}()
var mutableState workflow.MutableState
mutableState, releaseErr = workflowContext.LoadMutableState(ctx)
return extractMutableStateInfo(mutableState)
}
// extractMutableStateInfo extracts the relevant information to generate a start response with an eager workflow task.
func extractMutableStateInfo(mutableState workflow.MutableState) (*mutableStateInfo, error) {
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return nil, err
}
// Future work for the request retry path: extend the task timeout (by failing / timing out the current task).
workflowTaskInfoSource, hasInflight := mutableState.GetInFlightWorkflowTask()
// The workflowTaskInfo returned from the mutable state call is generated on the fly and technically doesn't require
// cloning. We clone here just in case that changes.
var workflowTaskInfo workflow.WorkflowTaskInfo
if hasInflight {
workflowTaskInfo = *workflowTaskInfoSource
}
return &mutableStateInfo{
branchToken: branchToken,
lastEventID: mutableState.GetNextEventID() - 1,
workflowTaskInfo: &workflowTaskInfo,
hasInflight: hasInflight,
}, nil
}
// getWorkflowHistory loads the workflow history based on given mutable state information from the DB.
func (s *Starter) getWorkflowHistory(ctx context.Context, mutableState *mutableStateInfo) ([]*historypb.HistoryEvent, error) {
var events []*historypb.HistoryEvent
// Future optimization: generate the task from mutable state to save the extra DB read.
// NOTE: While unlikely that there'll be more than one page, it's safer to make less assumptions.
// TODO: Frontend also supports returning raw history and it's controlled by a feature flag (yycptt thinks).
for {
response, err := s.shardCtx.GetExecutionManager().ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
ShardID: s.shardCtx.GetShardID(),
BranchToken: mutableState.branchToken,
MinEventID: 1,
MaxEventID: mutableState.lastEventID,
PageSize: 1024,
})
if err != nil {
return nil, err
}
events = append(events, response.HistoryEvents...)
if len(response.NextPageToken) == 0 {
break
}
}
return events, nil
}
// extractHistoryEvents extracts all history events from a batch of events sent to persistence.
// It's unlikely that persistence events would span multiple batches but better safe than sorry.
func extractHistoryEvents(persistenceEvents []*persistence.WorkflowEvents) []*historypb.HistoryEvent {
if len(persistenceEvents) == 1 {
return persistenceEvents[0].Events
}
var events []*historypb.HistoryEvent
for _, page := range persistenceEvents {
events = append(events, page.Events...)
}
return events
}
// generateResponse is a helper for generating StartWorkflowExecutionResponse for eager and non eager workflow start
// requests.
func (s *Starter) generateResponse(
runID string,
workflowTaskInfo *workflow.WorkflowTaskInfo,
historyEvents []*historypb.HistoryEvent,
) (*historyservice.StartWorkflowExecutionResponse, error) {
shardCtx := s.shardCtx
tokenSerializer := s.tokenSerializer
request := s.request.StartRequest
workflowID := request.WorkflowId
if !s.requestEagerStart() {
return &historyservice.StartWorkflowExecutionResponse{
RunId: runID,
}, nil
}
clock, err := shardCtx.NewVectorClock()
if err != nil {
return nil, err
}
taskToken := &tokenspb.Task{
NamespaceId: s.namespace.ID().String(),
WorkflowId: workflowID,
RunId: runID,
ScheduledEventId: workflowTaskInfo.ScheduledEventID,
Attempt: workflowTaskInfo.Attempt,
Clock: clock,
}
serializedToken, err := tokenSerializer.Serialize(taskToken)
if err != nil {
return nil, err
}
return &historyservice.StartWorkflowExecutionResponse{
RunId: runID,
Clock: clock,
EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{
TaskToken: serializedToken,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID, RunId: runID},
WorkflowType: request.GetWorkflowType(),
// TODO: consider getting the ID from mutable state, this was not done to avoid adding more complexity to
// the code to plumb that value through.
PreviousStartedEventId: 0,
StartedEventId: workflowTaskInfo.StartedEventID,
Attempt: workflowTaskInfo.Attempt,
History: &historypb.History{Events: historyEvents},
NextPageToken: nil,
WorkflowExecutionTaskQueue: workflowTaskInfo.TaskQueue,
ScheduledTime: workflowTaskInfo.ScheduledTime,
StartedTime: workflowTaskInfo.StartedTime,
},
}, nil
}