-
Notifications
You must be signed in to change notification settings - Fork 153
/
transformation.go
119 lines (101 loc) · 3.21 KB
/
transformation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package execute
import (
"context"
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
)
// Transformation represents functions that stream a set of tables, performs
// data processing on them and produces an output stream of tables
type Transformation interface {
RetractTable(id DatasetID, key flux.GroupKey) error
// Process takes in one flux Table, performs data processing on it and
// writes that table to a DataCache
Process(id DatasetID, tbl flux.Table) error
UpdateWatermark(id DatasetID, t Time) error
UpdateProcessingTime(id DatasetID, t Time) error
// Finish indicates that the Transformation is done processing. It is
// the last method called on the Transformation
Finish(id DatasetID, err error)
}
// TransformationSet is a group of transformations.
type TransformationSet []Transformation
func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error {
for _, t := range ts {
if err := t.RetractTable(id, key); err != nil {
return err
}
}
return nil
}
func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error {
if len(ts) == 0 {
return nil
} else if len(ts) == 1 {
return ts[0].Process(id, tbl)
}
// There is more than one transformation so we need to
// copy the table for each transformation.
bufTable, err := CopyTable(tbl)
if err != nil {
return err
}
defer bufTable.Done()
for _, t := range ts {
if err := t.Process(id, bufTable.Copy()); err != nil {
return err
}
}
return nil
}
func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error {
for _, t := range ts {
if err := t.UpdateWatermark(id, time); err != nil {
return err
}
}
return nil
}
func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error {
for _, t := range ts {
if err := t.UpdateProcessingTime(id, time); err != nil {
return err
}
}
return nil
}
func (ts TransformationSet) Finish(id DatasetID, err error) {
for _, t := range ts {
t.Finish(id, err)
}
}
// StreamContext represents necessary context for a single stream of
// query data.
type StreamContext interface {
Bounds() *Bounds
}
type Administration interface {
Context() context.Context
ResolveTime(qt flux.Time) Time
StreamContext() StreamContext
Allocator() memory.Allocator
Parents() []DatasetID
ParallelOpts() ParallelOpts
}
type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
var procedureToTransformation = make(map[plan.ProcedureKind]CreateTransformation)
// RegisterTransformation adds a new registration mapping of procedure kind to transformation.
func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation) {
if procedureToTransformation[k] != nil {
panic(fmt.Errorf("duplicate registration for transformation with procedure kind %v", k))
}
procedureToTransformation[k] = c
}
// ReplaceTransformation changes an existing transformation registration.
func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation) {
if procedureToTransformation[k] == nil {
panic(fmt.Errorf("missing registration for transformation with procedure kind %v", k))
}
procedureToTransformation[k] = c
}