-
Notifications
You must be signed in to change notification settings - Fork 153
/
influxdb.go
127 lines (105 loc) · 3.37 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package influxdb
import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/execute/table"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)
const pkgpath = "contrib/jsternberg/influxdb"
const maskKind = pkgpath + "._mask"
func init() {
runtime.RegisterPackageValue(pkgpath, "_mask", flux.MustValue(flux.FunctionValue(
"_mask",
createMaskOpSpec,
runtime.MustLookupBuiltinType(pkgpath, "_mask"),
)))
plan.RegisterProcedureSpec(maskKind, newMaskProcedure, maskKind)
execute.RegisterTransformation(maskKind, createMaskTransformation)
}
type maskOpSpec struct {
Columns []string
}
func createMaskOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
if err := a.AddParentFromArgs(args); err != nil {
return nil, err
}
spec := new(maskOpSpec)
columns, err := args.GetRequiredArray("columns", semantic.String)
if err != nil {
return nil, err
}
// FIXME: needs a test
if _, ok := columns.(*flux.TableObject); ok {
return nil, errors.New(codes.Invalid, "got a table stream; expected an array")
}
spec.Columns = make([]string, columns.Len())
columns.Range(func(i int, v values.Value) {
spec.Columns[i] = v.Str()
})
return spec, nil
}
func (a *maskOpSpec) Kind() flux.OperationKind {
return maskKind
}
type maskProcedureSpec struct {
plan.DefaultCost
Columns []string
}
func newMaskProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*maskOpSpec)
if !ok {
return nil, errors.Newf(codes.Internal, "invalid spec type %T", qs)
}
return &maskProcedureSpec{
Columns: spec.Columns,
}, nil
}
func (s *maskProcedureSpec) Kind() plan.ProcedureKind {
return maskKind
}
func (s *maskProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(maskProcedureSpec)
ns.Columns = make([]string, len(s.Columns))
copy(ns.Columns, s.Columns)
return ns
}
func createMaskTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
s, ok := spec.(*maskProcedureSpec)
if !ok {
return nil, nil, errors.Newf(codes.Internal, "invalid spec type %T", spec)
}
return newMaskTransformation(s, id)
}
type maskTransformation struct {
execute.ExecutionNode
d *execute.PassthroughDataset
spec *maskProcedureSpec
}
func newMaskTransformation(spec *maskProcedureSpec, id execute.DatasetID) (execute.Transformation, execute.Dataset, error) {
t := &maskTransformation{
d: execute.NewPassthroughDataset(id),
spec: spec,
}
return t, t.d, nil
}
func (t *maskTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
return t.d.RetractTable(key)
}
func (t *maskTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
outTable := table.Mask(tbl, t.spec.Columns)
return t.d.Process(outTable)
}
func (t *maskTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {
return t.d.UpdateWatermark(mark)
}
func (t *maskTransformation) UpdateProcessingTime(id execute.DatasetID, ts execute.Time) error {
return t.d.UpdateProcessingTime(ts)
}
func (t *maskTransformation) Finish(id execute.DatasetID, err error) {
t.d.Finish(err)
}