-
Notifications
You must be signed in to change notification settings - Fork 153
/
narrow_transformation.go
60 lines (50 loc) · 1.59 KB
/
narrow_transformation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package execute
import (
"github.com/apache/arrow/go/v7/arrow/memory"
"github.com/influxdata/flux/execute/table"
)
// NarrowTransformation implements a transformation that processes
// a table.Chunk and does not modify its group key.
type NarrowTransformation interface {
// Process will process the table.Chunk and send any output to the TransportDataset.
Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error
Closer
}
var _ Transport = (*narrowTransformation)(nil)
type narrowTransformation struct {
t NarrowTransformation
d *TransportDataset
}
// NewNarrowTransformation constructs a Transformation and Dataset
// using the NarrowTransformation implementation.
func NewNarrowTransformation(id DatasetID, t NarrowTransformation, mem memory.Allocator) (Transformation, Dataset, error) {
tr := &narrowTransformation{
t: t,
d: NewTransportDataset(id, mem),
}
return NewTransformationFromTransport(tr), tr.d, nil
}
// ProcessMessage will process the incoming message.
func (n *narrowTransformation) ProcessMessage(m Message) error {
defer m.Ack()
switch m := m.(type) {
case FinishMsg:
n.Finish(m.SrcDatasetID(), m.Error())
return nil
case ProcessChunkMsg:
return n.t.Process(m.TableChunk(), n.d, n.d.mem)
case FlushKeyMsg:
return n.d.FlushKey(m.Key())
case ProcessMsg:
panic("unreachable")
}
return nil
}
// Finish is implemented to remain compatible with legacy upstreams.
func (n *narrowTransformation) Finish(id DatasetID, err error) {
err = Close(err, n.t)
n.d.Finish(err)
}
func (n *narrowTransformation) OperationType() string {
return OperationType(n.t)
}