-
Notifications
You must be signed in to change notification settings - Fork 152
/
group_transformation.go
92 lines (77 loc) · 2.25 KB
/
group_transformation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package execute
import (
"github.com/apache/arrow/go/arrow/memory"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute/table"
)
// GroupTransformation is a transformation that can modify the group key.
// Other than modifying the group key, it is identical to a NarrowTransformation.
//
// The main difference between this and NarrowTransformation is that
// NarrowTransformation will pass the FlushKeyMsg to the Dataset
// and GroupTransformation will swallow this Message.
type GroupTransformation interface {
Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error
}
var _ Transport = (*groupTransformation)(nil)
var _ Transformation = (*groupTransformation)(nil)
type groupTransformation struct {
t GroupTransformation
d *TransportDataset
}
func (g *groupTransformation) OperationType() string {
return OperationType(g.t)
}
func NewGroupTransformation(id DatasetID, t GroupTransformation, mem memory.Allocator) (Transformation, Dataset, error) {
g := &groupTransformation{
t: t,
d: NewTransportDataset(id, mem),
}
return g, g.d, nil
}
// Implement the Transport interface
func (g *groupTransformation) ProcessMessage(m Message) error {
defer m.Ack()
switch m := m.(type) {
case FinishMsg:
g.Finish(m.SrcDatasetID(), m.Error())
return nil
case ProcessChunkMsg:
return g.t.Process(m.TableChunk(), g.d, g.d.mem)
case FlushKeyMsg:
return nil
case ProcessMsg:
return g.Process(m.SrcDatasetID(), m.Table())
}
return nil
}
func (g *groupTransformation) Process(id DatasetID, tbl flux.Table) error {
if err := tbl.Do(func(cr flux.ColReader) error {
chunk := table.ChunkFromReader(cr)
chunk.Retain()
m := processChunkMsg{
srcMessage: srcMessage(id),
chunk: chunk,
}
return g.ProcessMessage(&m)
}); err != nil {
return err
}
m := flushKeyMsg{
srcMessage: srcMessage(id),
key: tbl.Key(),
}
return g.ProcessMessage(&m)
}
func (g *groupTransformation) Finish(id DatasetID, err error) {
g.d.Finish(err)
}
func (g *groupTransformation) RetractTable(id DatasetID, key flux.GroupKey) error {
return nil
}
func (g *groupTransformation) UpdateWatermark(id DatasetID, t Time) error {
return nil
}
func (g *groupTransformation) UpdateProcessingTime(id DatasetID, t Time) error {
return nil
}