-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
from.go
128 lines (108 loc) · 3.22 KB
/
from.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package influxdb
import (
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
)
const FromKind = "influxDBFrom"
type FromOpSpec struct {
Bucket string `json:"bucket,omitempty"`
BucketID string `json:"bucketID,omitempty"`
}
func init() {
fromSignature := semantic.FunctionPolySignature{
Parameters: map[string]semantic.PolyType{
"bucket": semantic.String,
"bucketID": semantic.String,
},
Required: nil,
Return: flux.TableObjectType,
}
flux.ReplacePackageValue("influxdata/influxdb", influxdb.FromKind, flux.FunctionValue(FromKind, createFromOpSpec, fromSignature))
flux.RegisterOpSpec(FromKind, newFromOp)
plan.RegisterProcedureSpec(FromKind, newFromProcedure, FromKind)
}
func createFromOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
spec := new(FromOpSpec)
if bucket, ok, err := args.GetString("bucket"); err != nil {
return nil, err
} else if ok {
spec.Bucket = bucket
}
if bucketID, ok, err := args.GetString("bucketID"); err != nil {
return nil, err
} else if ok {
spec.BucketID = bucketID
}
if spec.Bucket == "" && spec.BucketID == "" {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "must specify one of bucket or bucketID",
}
}
if spec.Bucket != "" && spec.BucketID != "" {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "must specify only one of bucket or bucketID",
}
}
return spec, nil
}
func newFromOp() flux.OperationSpec {
return new(FromOpSpec)
}
func (s *FromOpSpec) Kind() flux.OperationKind {
return FromKind
}
type FromProcedureSpec struct {
Bucket string
BucketID string
}
func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*FromOpSpec)
if !ok {
return nil, &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("invalid spec type %T", qs),
}
}
return &FromProcedureSpec{
Bucket: spec.Bucket,
BucketID: spec.BucketID,
}, nil
}
func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}
func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromProcedureSpec)
ns.Bucket = s.Bucket
ns.BucketID = s.BucketID
return ns
}
func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// FromProcedureSpec is a logical operation representing any read
// from storage. However as a logical operation, it doesn't specify
// how data is to be read from storage. It is the query planner's
// job to determine the optimal read strategy and to convert this
// logical operation into the appropriate physical operation.
//
// Logical operations cannot be executed by the query engine. So if
// this operation is still around post physical planning, it means
// that a 'range' could not be pushed down to storage. Storage does
// not support unbounded reads, and so this query must not be
// validated.
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
} else {
bucket = s.BucketID
}
return &flux.Error{
Code: codes.Invalid,
Msg: fmt.Sprintf("cannot submit unbounded read to %q; try bounding 'from' with a call to 'range'", bucket),
}
}