-
Notifications
You must be signed in to change notification settings - Fork 152
/
source.go
65 lines (54 loc) · 1.62 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package executetest
import (
"context"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
uuid "github.com/satori/go.uuid"
)
const FromTestKind = "from-test"
// FromProcedureSpec is a procedure spec AND an execution Node.
// It simulates the execution of a basic physical scan operation.
type FromProcedureSpec struct {
data []*Table
ts []execute.Transformation
}
// NewFromProcedureSpec specifies a from-test procedure with source data
func NewFromProcedureSpec(data []*Table) *FromProcedureSpec {
// Normalize data before anything can read it
for _, tbl := range data {
tbl.Normalize()
}
return &FromProcedureSpec{data: data}
}
func (src *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromTestKind
}
func (src *FromProcedureSpec) Copy() plan.ProcedureSpec {
return src
}
func (src *FromProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics) {
return plan.Cost{}, plan.Statistics{}
}
func (src *FromProcedureSpec) AddTransformation(t execute.Transformation) {
src.ts = append(src.ts, t)
}
func (src *FromProcedureSpec) Run(ctx context.Context) {
id := execute.DatasetID(uuid.NewV4())
for _, t := range src.ts {
var max execute.Time
for _, tbl := range src.data {
t.Process(id, tbl)
stopIdx := execute.ColIdx(execute.DefaultStopColLabel, tbl.Cols())
if stopIdx >= 0 {
if s := tbl.Key().ValueTime(stopIdx); s > max {
max = s
}
}
}
t.UpdateWatermark(id, max)
t.Finish(id, nil)
}
}
func CreateFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) {
return spec.(*FromProcedureSpec), nil
}