forked from apache/beam
/
plan.go
234 lines (205 loc) · 6.83 KB
/
plan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package exec contains runtime plan representation and execution. A pipeline
// must be translated to a runtime plan to be executed.
package exec
import (
"context"
"fmt"
"strings"
"sync"
"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// Plan represents the bundle execution plan. It will generally be constructed
// from a part of a pipeline. A plan can be used to process multiple bundles
// serially.
type Plan struct {
id string
roots []Root
units []Unit
parDoIDs []string
status Status
// While the store is threadsafe, the reference to it
// is not, so we need to protect the store field to be
// able to asynchronously provide tentative metrics.
storeMu sync.Mutex
store *metrics.Store
// TODO: there can be more than 1 DataSource in a bundle.
source *DataSource
}
// hasPID provides a common interface for extracting PTransformIDs
// from Units.
type hasPID interface {
GetPID() string
}
// NewPlan returns a new bundle execution plan from the given units.
func NewPlan(id string, units []Unit) (*Plan, error) {
var roots []Root
var source *DataSource
var pardoIDs []string
for _, u := range units {
if u == nil {
return nil, errors.Errorf("no <nil> units")
}
if r, ok := u.(Root); ok {
roots = append(roots, r)
}
if s, ok := u.(*DataSource); ok {
source = s
}
if p, ok := u.(hasPID); ok {
pardoIDs = append(pardoIDs, p.GetPID())
}
}
if len(roots) == 0 {
return nil, errors.Errorf("no root units")
}
return &Plan{
id: id,
status: Initializing,
roots: roots,
units: units,
parDoIDs: pardoIDs,
source: source,
}, nil
}
// ID returns the plan identifier.
func (p *Plan) ID() string {
return p.id
}
// SourcePTransformID returns the ID of the data's origin PTransform.
func (p *Plan) SourcePTransformID() string {
return p.source.SID.PtransformID
}
// Execute executes the plan with the given data context and bundle id. Units
// are brought up on the first execution. If a bundle fails, the plan cannot
// be reused for further bundles. Does not panic. Blocking.
func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error {
ctx = metrics.SetBundleID(ctx, p.id)
p.storeMu.Lock()
p.store = metrics.GetStore(ctx)
p.storeMu.Unlock()
if p.status == Initializing {
for _, u := range p.units {
if err := callNoPanic(ctx, u.Up); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing Up for %v", p)
}
}
p.status = Up
}
if p.source != nil {
p.source.InitSplittable()
}
if p.status != Up {
return errors.Errorf("invalid status for plan %v: %v", p.id, p.status)
}
// Process bundle. If there are any kinds of failures, we bail and mark the plan broken.
p.status = Active
for _, root := range p.roots {
if err := callNoPanic(ctx, func(ctx context.Context) error { return root.StartBundle(ctx, id, manager) }); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing StartBundle for %v", p)
}
}
for _, root := range p.roots {
if err := callNoPanic(ctx, root.Process); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing Process for %v", p)
}
}
for _, root := range p.roots {
if err := callNoPanic(ctx, root.FinishBundle); err != nil {
p.status = Broken
return errors.Wrapf(err, "while executing FinishBundle for %v", p)
}
}
p.status = Up
return nil
}
// Down takes the plan and associated units down. Does not panic.
func (p *Plan) Down(ctx context.Context) error {
if p.status == Down {
return nil // ok: already down
}
p.status = Down
var errs []error
for _, u := range p.units {
if err := callNoPanic(ctx, u.Down); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errors.Wrapf(errs[0], "plan %v failed", p.id)
default:
return errors.Errorf("plan %v failed with multiple errors: %v", p.id, errs)
}
}
func (p *Plan) String() string {
var units []string
for _, u := range p.units {
units = append(units, fmt.Sprintf("%v: %v", u.ID(), u))
}
return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
}
// Progress returns a snapshot of input progress of the plan, and associated metrics.
func (p *Plan) Progress() (ProgressReportSnapshot, bool) {
if p.source != nil {
return p.source.Progress(), true
}
return ProgressReportSnapshot{}, false
}
// Store returns the metric store for the last use of this plan.
func (p *Plan) Store() *metrics.Store {
p.storeMu.Lock()
defer p.storeMu.Unlock()
return p.store
}
// SplitPoints captures the split requested by the Runner.
type SplitPoints struct {
// Splits is a list of desired split indices.
Splits []int64
Frac float64
// Estimated total number of elements (including unsent) for the source.
// A zero value indicates unknown, instead use locally known size.
BufSize int64
}
// SplitResult contains the result of performing a split on a Plan.
type SplitResult struct {
// Indices are always included, for both channel and sub-element splits.
PI int64 // Primary index, last element of the primary.
RI int64 // Residual index, first element of the residual.
// Extra information included for sub-element splits. If PS and RS are
// present then a sub-element split occurred.
PS [][]byte // Primary splits. If an element is split, these are the encoded primaries.
RS [][]byte // Residual splits. If an element is split, these are the encoded residuals.
TId string // Transform ID of the transform receiving the split elements.
InId string // Input ID of the input the split elements are received from.
}
// Split takes a set of potential split indexes, and if successful returns
// the split result.
// Returns an error when unable to split.
func (p *Plan) Split(s SplitPoints) (SplitResult, error) {
// TODO: When bundles with multiple sources, are supported, perform splits
// on all sources.
if p.source != nil {
return p.source.Split(s.Splits, s.Frac, s.BufSize)
}
return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s)
}