-
Notifications
You must be signed in to change notification settings - Fork 0
/
bundle.go
111 lines (95 loc) · 3.9 KB
/
bundle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package worker
import (
"sync"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/lostluck/experimental/local/internal/engine"
"golang.org/x/exp/slog"
)
// B represents an extant ProcessBundle instruction sent to an SDK worker.
// Generally manipulated by another package to interact with a worker.
type B struct {
InstID string // ID for the instruction processing this bundle.
PBDID string // ID for the ProcessBundleDescriptor
// InputTransformID is data being sent to the SDK.
InputTransformID string
InputData [][]byte // Data specifically for this bundle.
// TODO change to a single map[tid] -> map[input] -> map[window] -> struct { Iter data, MultiMap data } instead of all maps.
// IterableSideInputData is a map from transformID, to inputID, to window, to data.
IterableSideInputData map[string]map[string]map[typex.Window][][]byte
// MultiMapSideInputData is a map from transformID, to inputID, to window, to data key, to data values.
MultiMapSideInputData map[string]map[string]map[typex.Window]map[string][][]byte
// OutputCount is the number of data outputs this bundle has.
// We need to see this many closed data channels before the bundle is complete.
OutputCount int
// dataWait is how we determine if a bundle is finished, by waiting for each of
// a Bundle's DataSinks to produce their last output.
// After this point we can "commit" the bundle's output for downstream use.
dataWait sync.WaitGroup
OutputData engine.TentativeData
Resp chan *fnpb.ProcessBundleResponse
SinkToPCollection map[string]string
// TODO: Metrics for this bundle, can be handled after the fact.
}
// Init initializes the
func (b *B) Init() {
// We need to see final data signals that match the number of
// outputs the stage this bundle executes posesses.
b.dataWait.Add(b.OutputCount)
}
func (b *B) LogValue() slog.Value {
return slog.GroupValue(
slog.String("ID", b.InstID),
slog.String("stage", b.PBDID))
}
// ProcessOn executes the given bundle on the given worker.
//
// Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set.)
// Assumes the bundle descriptor is already registered.
//
// While this method mostly manipulates a W, putting it on a B avoids mixing the workers
// public GRPC APIs up with local calls.
func (b *B) ProcessOn(wk *W) {
wk.mu.Lock()
wk.bundles[b.InstID] = b
wk.mu.Unlock()
slog.Debug("processing", "bundle", b, "worker", wk)
// Tell the SDK to start processing the bundle.
wk.InstReqs <- &fnpb.InstructionRequest{
InstructionId: b.InstID,
Request: &fnpb.InstructionRequest_ProcessBundle{
ProcessBundle: &fnpb.ProcessBundleRequest{
ProcessBundleDescriptorId: b.PBDID,
},
},
}
// TODO: make batching decisions.
for i, d := range b.InputData {
wk.DataReqs <- &fnpb.Elements{
Data: []*fnpb.Elements_Data{
{
InstructionId: b.InstID,
TransformId: b.InputTransformID,
Data: d,
IsLast: i+1 == len(b.InputData),
},
},
}
}
slog.Debug("waiting on data", "bundle", b)
b.dataWait.Wait() // Wait until data is ready.
}