/
beam.go
288 lines (247 loc) · 8.19 KB
/
beam.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// Package beam is an experimental mockup of an Apache Beam Go SDK API that
// leverages generics, and a more opinionated construction method. It exists
// to explore the ergonomics and feasibility of such an approach.
//
// This one in particular is a variant on allinone, which avoids the use of
// separate goroutines and channels to pass around elements.
package beam
import (
"context"
"fmt"
"reflect"
"strconv"
"sync/atomic"
"time"
"github.com/lostluck/experimental/altbeams/allinone2/beam/internal/beamopts"
"github.com/lostluck/experimental/altbeams/allinone2/beam/internal/extworker"
"github.com/lostluck/experimental/altbeams/allinone2/beam/internal/harness"
fnpb "github.com/lostluck/experimental/altbeams/allinone2/beam/internal/model/fnexecution_v1"
pipepb "github.com/lostluck/experimental/altbeams/allinone2/beam/internal/model/pipeline_v1"
"github.com/lostluck/experimental/altbeams/allinone2/beam/internal/runner/universal"
"google.golang.org/protobuf/proto"
)
type KV[K, V Element] struct {
Key K
Value V
}
type Element interface {
any // Sadly, can't really restrict this without breaking iterators in GBK results.
}
// ElmC is the catch all context for the current element.
//
// This includes
// * Key (state and timers)
// * Windows
// * Timestamp
// * Pane
//
// Provides the downstream emission context, so it actually sends data to the next DoFn.
type ElmC struct {
elmContext
pcollections []processor
}
func (e *ElmC) EventTime() time.Time {
return e.eventTime
}
// Process is the function type for handling a single element in a bundle.
//
// Typically a closure returned from a Transform's ProcessBundle method.
//
// Errors returned from Process functions abort bundle processing, and may
// cause pipeline termination. A runner may retry a bundle that has failed.
type Process[E Element] func(ElmC, E) error
// Transform is the only interface that needs to be implemented by most DoFns.
type Transform[E Element] interface {
ProcessBundle(ctx context.Context, dfc *DFC[E]) error
}
type Iter[V Element] struct {
source func() (V, bool) // source returns true if the element is valid.
}
func (Iter[V]) metatype() {}
var _ metatype = Iter[int]{}
type metatype interface {
metatype()
}
// isMetaType checks if a type has characteristics that make it unsuitable
// for some usage. eg. Iter types should not be used as side input values,
// since an iterator of iterators isn't a good idea.
func isMetaType(v any) bool {
_, ok := v.(metatype)
return ok
}
// All allows a single iteration of its stream of values.
func (it *Iter[V]) All() func(perElm func(elm V) bool) {
return func(perElm func(elm V) bool) {
for {
v, ok := it.source()
if !ok {
return
}
if !perElm(v) {
return
}
}
}
}
func start(ctx context.Context, dfc *DFC[[]byte]) error {
if err := dfc.start(ctx); err != nil {
return err
}
dfc.metrics.setState(1, dfc.edgeID)
if err := dfc.perElm(ElmC{elmContext{
eventTime: time.Now(),
}, dfc.downstream}, []byte{1, 2, 3, 4, 5, 6, 7, 7}); err != nil {
panic(fmt.Errorf("doFn id %v failed: %w", dfc.id, err))
}
return nil
}
// Scope is used for building pipeline graphs.
//
// Scope is a hierarchical grouping for composite transforms. Scopes can be
// enclosed in other scopes and for a tree structure. For pipeline updates,
// the scope chain form a unique name. The scope chain can also be used for
// monitoring and visualization purposes.
type Scope struct {
name string
parent *Scope
g *graph
}
func (s *Scope) String() string {
if s == nil {
return ""
}
return s.parent.String() + "/" + s.name
}
// Pipeline is a handle to a running or terminated pipeline for
// programmatic access to the given runner.
type Pipeline struct {
Counters map[string]int64
}
// Composite transforms allow structural re-use of sub pipelines.
type Composite[O any] interface {
Expand(s *Scope) O
}
func Expand[I Composite[O], O any](parent *Scope, name string, comp I) O {
s := &Scope{name: name, parent: parent, g: parent.g}
// We do all the expected connections here.
// Side inputs, are put on the side input at the DoFn creation time being passed in.
return comp.Expand(s)
}
// Run begins executes the pipeline built in the construction function.
func Run(ctx context.Context, expand func(*Scope) error, opts ...Options) (Pipeline, error) {
opt := beamopts.Struct{
Endpoint: "localhost:8073",
}
opt.Join(opts...)
var g graph
s := &Scope{parent: nil, g: &g}
g.root = s
if err := expand(s); err != nil {
return Pipeline{}, fmt.Errorf("pipeline construction error:%w", err)
}
// At this point the graph is complete, and we need to turn serialize/deserialize it
// into executing code.
typeReg := map[string]reflect.Type{}
pipe := g.marshal(typeReg)
// TODO(BEAM-10610): Allow user configuration of this port, rather than kernel selected.
srv, err := extworker.StartLoopback(ctx, 0, executeSubgraph(typeReg))
if err != nil {
return Pipeline{}, err
}
defer srv.Stop(ctx)
serializedPayload, err := proto.Marshal(&pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: srv.EnvironmentConfig(ctx)}})
if err != nil {
return Pipeline{}, err
}
env := &pipepb.Environment{
Urn: "beam:env:external:v1",
Payload: serializedPayload,
Capabilities: nil, // TODO
}
pipe.Components.Environments["go"] = env
handle, err := universal.Execute(ctx, pipe, opt)
if err != nil {
return Pipeline{}, err
}
r, err := handle.Metrics(ctx)
if err != nil {
return Pipeline{}, err
}
p := Pipeline{
Counters: r.UserCounters(),
}
return p, nil
}
func executeSubgraph(typeReg map[string]reflect.Type) harness.ExecFunc {
var shortID atomic.Uint32
return func(ctx context.Context, ctrl *harness.Control, dataCon harness.DataContext) (*fnpb.ProcessBundleResponse, error) {
// 1. Provide translation function (unmarshalToGraph + types closure) to harness.
// * Harness then returns a graph, either getting a cached old version, or building a new one from proto.
// * Caches the proto in a weak map somewhere...
g, err := ctrl.GetOrLookupPlan(dataCon, func(comps *fnpb.ProcessBundleDescriptor) any {
return unmarshalToGraph(typeReg, comps)
})
if err != nil {
return nil, err
}
newG := g.(*graph)
// 2. Build a new runnable instance, get execution roots and metrics.
roots, mets := newG.build(ctx, dataCon)
// 3. Register the metrics handling function for this instruction with the harness.
// * This handles progress and tentative metrics
ctrl.RegisterMonitor(dataCon, func() (map[string]*pipepb.MonitoringInfo, map[string][]byte) {
mons := mets.MonitoringInfos(newG)
pylds := map[string][]byte{}
labels := map[string]*pipepb.MonitoringInfo{}
for _, mon := range mons {
key := strconv.FormatInt(int64(shortID.Add(1)), 36)
pylds[key] = mon.GetPayload()
labels[key] = &pipepb.MonitoringInfo{
Urn: mon.GetUrn(),
Type: mon.GetType(),
Labels: mon.GetLabels(),
}
}
return labels, pylds
})
// 4. Register a split handler with the harness
// * This handles channel splits and SDF splits
ctrl.RegisterSplitter(dataCon, func(splits map[string]*fnpb.ProcessBundleSplitRequest_DesiredSplit) (*fnpb.ProcessBundleSplitResponse, error) {
ret := &fnpb.ProcessBundleSplitResponse{}
for _, root := range roots {
split, ok := splits[root.transformID()]
if !ok {
continue
}
resp := root.split(split)
proto.Merge(ret, resp)
}
return ret, nil
})
// TODO bundle finalization
// The above would be cleaned up in the harness.
// 5. Start DoFn sampling for this processing thread.
mets.startSampling(ctx, 10*time.Millisecond, 5*time.Minute)
defer mets.stopSampling()
// 6. Process the bundle.
for _, root := range roots {
if err := start(ctx, root.(*DFC[[]byte])); err != nil {
return nil, err
}
}
// 7. Run finishes bundles.
for _, root := range roots {
if err := root.finish(); err != nil {
return nil, err
}
}
// 8. Respond.
// Note, Metrics and Data would be handled outside.
// Since we may have residuals for ProcessContinuations, or finalization
// we return this here for those in the future.
return &fnpb.ProcessBundleResponse{
// ResidualRoots: rRoots,
// RequiresFinalization: requiresFinalization,
}, nil
}
}