-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
storage.go
125 lines (103 loc) · 3.1 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package influxdb
import (
"context"
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxql"
"github.com/pkg/errors"
)
type Authorizer interface {
AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error
}
type FromDependencies struct {
Reader Reader
MetaClient MetaClient
Authorizer Authorizer
AuthEnabled bool
}
func (d FromDependencies) Validate() error {
if d.Reader == nil {
return errors.New("missing reader dependency")
}
if d.MetaClient == nil {
return errors.New("missing meta client dependency")
}
if d.AuthEnabled && d.Authorizer == nil {
return errors.New("missing authorizer dependency")
}
return nil
}
type GroupMode int
const (
// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = iota
// GroupModeBy produces a table for each unique value of the specified GroupKeys.
GroupModeBy
)
// ToGroupMode accepts the group mode from Flux and produces the appropriate storage group mode.
func ToGroupMode(fluxMode flux.GroupMode) GroupMode {
switch fluxMode {
case flux.GroupModeNone:
return GroupModeNone
case flux.GroupModeBy:
return GroupModeBy
default:
panic(fmt.Sprint("unknown group mode: ", fluxMode))
}
}
type ReadFilterSpec struct {
Database string
RetentionPolicy string
Bounds execute.Bounds
Predicate *datatypes.Predicate
}
type ReadGroupSpec struct {
ReadFilterSpec
GroupMode GroupMode
GroupKeys []string
AggregateMethod string
}
type ReadTagKeysSpec struct {
ReadFilterSpec
}
type ReadTagValuesSpec struct {
ReadFilterSpec
TagKey string
}
// Window and the WindowEvery/Offset should be mutually exclusive. If you set either the WindowEvery or Offset with
// nanosecond values, then the Window will be ignored
type ReadWindowAggregateSpec struct {
ReadFilterSpec
WindowEvery int64
Offset int64
Aggregates []plan.ProcedureKind
CreateEmpty bool
TimeColumn string
Window execute.Window
}
func (spec *ReadWindowAggregateSpec) Name() string {
var agg string
if len(spec.Aggregates) > 0 {
agg = string(spec.Aggregates[0])
}
return fmt.Sprintf("readWindow(%s)", agg)
}
type Reader interface {
ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error)
ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc *memory.Allocator) (TableIterator, error)
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error)
ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc *memory.Allocator) (TableIterator, error)
ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc *memory.Allocator) (TableIterator, error)
Close()
}
// TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine.
type TableIterator interface {
flux.TableIterator
Statistics() cursors.CursorStats
}