forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cogbk.go
84 lines (71 loc) · 1.96 KB
/
cogbk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package primitives
import (
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
)
func genA(_ []byte, emit func(string, int)) {
emit("a", 1)
emit("a", 2)
emit("a", 3)
emit("b", 4)
emit("b", 5)
emit("c", 6)
}
func genB(_ []byte, emit func(string, int)) {
emit("a", 7)
emit("b", 8)
emit("d", 9)
}
func sum(nums func(*int) bool) int {
var ret, i int
for nums(&i) {
ret += i
}
return ret
}
func joinFn(key string, as, bs func(*int) bool, emit func(string, int)) {
emit(key, sum(as)+sum(bs))
}
func splitFn(key string, v int, a, b, c, d func(int)) {
switch key {
case "a":
a(v)
case "b":
b(v)
case "c":
c(v)
case "d":
d(v)
default:
panic(fmt.Sprintf("bad key: %v", key))
}
}
// CoGBK tests CoGBK.
func CoGBK() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
as := beam.ParDo(s, genA, beam.Impulse(s))
bs := beam.ParDo(s, genB, beam.Impulse(s))
grouped := beam.CoGroupByKey(s, as, bs)
joined := beam.ParDo(s, joinFn, grouped)
a, b, c, d := beam.ParDo4(s, splitFn, joined)
passert.Sum(s, a, "a", 1, 13)
passert.Sum(s, b, "b", 1, 17)
passert.Sum(s, c, "c", 1, 6)
passert.Sum(s, d, "d", 1, 9)
return p
}