forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
116 lines (94 loc) · 2.42 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package storage
import (
"context"
"errors"
"sort"
"strings"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)
type Store struct {
TSDBStore *tsdb.Store
MetaClient interface {
Database(name string) *meta.DatabaseInfo
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
}
Logger *zap.Logger
}
func NewStore() *Store {
return &Store{Logger: zap.NewNop()}
}
// WithLogger sets the logger for the service.
func (s *Store) WithLogger(log *zap.Logger) {
s.Logger = log.With(zap.String("service", "store"))
}
func (s *Store) Read(ctx context.Context, req *ReadRequest) (*ResultSet, error) {
database, rp := req.Database, ""
if p := strings.IndexByte(database, '/'); p > -1 {
database, rp = database[:p], database[p+1:]
}
di := s.MetaClient.Database(database)
if di == nil {
return nil, errors.New("no database")
}
if rp == "" {
rp = di.DefaultRetentionPolicy
}
rpi := di.RetentionPolicy(rp)
if rpi == nil {
return nil, errors.New("invalid retention policy")
}
var start, end = models.MinNanoTime, models.MaxNanoTime
if req.TimestampRange.Start > 0 {
start = req.TimestampRange.Start
}
if req.TimestampRange.End > 0 {
end = req.TimestampRange.End
}
groups, err := s.MetaClient.ShardGroupsByTimeRange(database, rp, time.Unix(0, start), time.Unix(0, end))
if err != nil {
return nil, err
}
if len(groups) == 0 {
return nil, nil
}
if req.Descending {
sort.Sort(sort.Reverse(meta.ShardGroupInfos(groups)))
} else {
sort.Sort(meta.ShardGroupInfos(groups))
}
shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups))
for _, g := range groups {
for _, si := range g.Shards {
shardIDs = append(shardIDs, si.ID)
}
}
var cur seriesCursor
if ic, err := newIndexSeriesCursor(ctx, req, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil {
return nil, nil
} else {
cur = ic
}
if len(req.Grouping) > 0 {
cur = newGroupSeriesCursor(ctx, cur, req.Grouping)
}
if req.SeriesLimit > 0 || req.SeriesOffset > 0 {
cur = newLimitSeriesCursor(ctx, cur, req.SeriesLimit, req.SeriesOffset)
}
return &ResultSet{
req: readRequest{
ctx: ctx,
start: start,
end: end,
asc: !req.Descending,
limit: req.PointsLimit,
aggregate: req.Aggregate,
},
cur: cur,
}, nil
}