/
current_execution.go
211 lines (187 loc) · 8.58 KB
/
current_execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package executions
import (
"context"
"strconv"
"time"
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/pagination"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/common/reconciliation/store"
"github.com/uber/cadence/service/worker/scanner/shardscanner"
)
const (
// currentExecutionsScannerWFID is the current execution scanner workflow ID
currentExecutionsScannerWFID = "cadence-sys-current-executions-scanner"
// CurrentExecutionsScannerWFTypeName is the current execution scanner workflow type
CurrentExecutionsScannerWFTypeName = "cadence-sys-current-executions-scanner-workflow"
// CurrentExecutionsScannerTaskListName is the current execution scanner workflow tasklist
CurrentExecutionsScannerTaskListName = "cadence-sys-current-executions-scanner-tasklist-0"
// CurrentExecutionsFixerWFTypeName is the current execution fixer workflow ID
CurrentExecutionsFixerWFTypeName = "cadence-sys-current-executions-fixer-workflow"
currentExecutionsFixerWFID = "cadence-sys-current-executions-fixer"
// CurrentExecutionsFixerTaskListName is the current execution fixer workflow tasklist
CurrentExecutionsFixerTaskListName = "cadence-sys-current-executions-fixer-tasklist-0"
)
/*
!!!!!!!!!!!!!!
NOTE: Current execution fixers have never been run.
Beware drawing any conclusions from current-execution scanner/fixer code.
!!!!!!!!!!!!!!
While this code appears structurally complete, the wrong fixer manager is being
used, and we have apparently never fully enabled it in our production clusters.
It likely needs further checks and possibly a rewrite before attempting to use.
*/
// CurrentScannerWorkflow is the workflow that scans over all current executions
func CurrentScannerWorkflow(
ctx workflow.Context,
params shardscanner.ScannerWorkflowParams,
) error {
logger := workflow.GetLogger(ctx)
logger.Info("Starting CurrentScannerWorkflow", zap.Any("Params", params))
wf, err := shardscanner.NewScannerWorkflow(ctx, CurrentExecutionsScannerWFTypeName, params)
if err != nil {
logger.Error("Failed to start new scanner workflow", zap.Error(err))
return err
}
err = wf.Start(ctx)
if err != nil {
logger.Error("Failed to execute scanner workflow", zap.Error(err))
}
return err
}
// currentExecutionScannerHooks provides hooks for current executions scanner.
func currentExecutionScannerHooks() *shardscanner.ScannerHooks {
wf, err := shardscanner.NewScannerHooks(currentExecutionScannerManager, currentExecutionScannerIterator, currentExecutionCustomScannerConfig)
if err != nil {
return nil
}
return wf
}
// currentExecutionScannerManager is the current execution scanner manager
func currentExecutionScannerManager(
ctx context.Context,
pr persistence.Retryer,
params shardscanner.ScanShardActivityParams,
domainCache cache.DomainCache,
) invariant.Manager {
logger := zap.L()
logger.Info("Creating invariant manager for current execution scanner", zap.Any("Params", params))
var ivs []invariant.Invariant
collections := ParseCollections(params.ScannerConfig)
for _, fn := range CurrentExecutionType.ToInvariants(collections, zap.NewNop()) {
ivs = append(ivs, fn(pr, domainCache))
}
return invariant.NewInvariantManager(ivs)
}
// CurrentFixerWorkflow starts current executions fixer.
func CurrentFixerWorkflow(
ctx workflow.Context,
params shardscanner.FixerWorkflowParams,
) error {
wf, err := shardscanner.NewFixerWorkflow(ctx, CurrentExecutionsFixerWFTypeName, params)
if err != nil {
return err
}
return wf.Start(ctx)
}
// currentExecutionCustomScannerConfig resolves dynamic config for current executions scanner.
func currentExecutionCustomScannerConfig(ctx shardscanner.ScannerContext) shardscanner.CustomScannerConfig {
res := shardscanner.CustomScannerConfig{}
if ctx.Config.DynamicCollection.GetBoolProperty(dynamicconfig.CurrentExecutionsScannerInvariantCollectionHistory)() {
res[invariant.CollectionHistory.String()] = strconv.FormatBool(true)
}
if ctx.Config.DynamicCollection.GetBoolProperty(dynamicconfig.CurrentExecutionsScannerInvariantCollectionMutableState)() {
res[invariant.CollectionMutableState.String()] = strconv.FormatBool(true)
}
return res
}
// currentExecutionFixerHooks provides hooks for current executions fixer.
func currentExecutionFixerHooks() *shardscanner.FixerHooks {
noCustomConfig := func(fixer shardscanner.FixerContext) shardscanner.CustomScannerConfig {
return nil
}
// TODO: yes, this DOES incorrectly use the concrete execution fixer manager, which does not work.
// It is retained for now to avoid making a lot of mostly-unrelated changes / fixes / cleanup.
h, err := shardscanner.NewFixerHooks(concreteExecutionFixerManager, currentExecutionFixerIterator, noCustomConfig)
if err != nil {
return nil
}
return h
}
// CurrentExecutionConfig configures current execution scanner
func CurrentExecutionConfig(dc *dynamicconfig.Collection) *shardscanner.ScannerConfig {
return &shardscanner.ScannerConfig{
ScannerWFTypeName: CurrentExecutionsScannerWFTypeName,
FixerWFTypeName: CurrentExecutionsFixerWFTypeName,
DynamicCollection: dc,
DynamicParams: shardscanner.DynamicParams{
ScannerEnabled: dc.GetBoolProperty(dynamicconfig.CurrentExecutionsScannerEnabled),
FixerEnabled: dc.GetBoolProperty(dynamicconfig.CurrentExecutionFixerEnabled),
Concurrency: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerConcurrency),
PageSize: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerPersistencePageSize),
BlobstoreFlushThreshold: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerBlobstoreFlushThreshold),
ActivityBatchSize: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerActivityBatchSize),
AllowDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.CurrentExecutionFixerDomainAllow),
},
ScannerHooks: currentExecutionScannerHooks,
FixerHooks: currentExecutionFixerHooks,
StartWorkflowOptions: cclient.StartWorkflowOptions{
ID: currentExecutionsScannerWFID,
TaskList: CurrentExecutionsScannerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
StartFixerOptions: cclient.StartWorkflowOptions{
ID: currentExecutionsFixerWFID,
TaskList: CurrentExecutionsFixerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
}
}
// currentExecutionScannerIterator is the iterator of current executions
func currentExecutionScannerIterator(
ctx context.Context,
pr persistence.Retryer,
params shardscanner.ScanShardActivityParams,
) pagination.Iterator {
return CurrentExecutionType.ToIterator()(ctx, pr, params.PageSize)
}
// currentExecutionFixerIterator is the iterator of fixer execution
func currentExecutionFixerIterator(
ctx context.Context,
client blobstore.Client,
keys store.Keys,
_ shardscanner.FixShardActivityParams,
) store.ScanOutputIterator {
return store.NewBlobstoreIterator(ctx, client, keys, CurrentExecutionType.ToBlobstoreEntity())
}