Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use storage package for Prometheus remote read #9967

Merged
merged 12 commits into from
Jun 13, 2018
8 changes: 7 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.Version = s.buildInfo.Version
srv.Handler.BuildType = "OSS"

// Wire up storage service for Prometheus endpoints.
storageStore := storage.NewStore()
storageStore.MetaClient = s.MetaClient
storageStore.TSDBStore = s.TSDBStore
srv.Handler.Store = storageStore

s.Services = append(s.Services, srv)
}

Expand Down Expand Up @@ -420,7 +426,7 @@ func (s *Server) Open() error {
return fmt.Errorf("open tsdb store: %s", err)
}

// Open the subcriber service
// Open the subscriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/store/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,12 @@ func mapOpToComparison(op influxql.Token) storage.Node_Comparison {
switch op {
case influxql.EQ:
return storage.ComparisonEqual
case influxql.EQREGEX:
return storage.ComparisonRegex
case influxql.NEQ:
return storage.ComparisonNotEqual
case influxql.NEQREGEX:
return storage.ComparisonNotEqual
case influxql.LT:
return storage.ComparisonLess
case influxql.LTE:
Expand Down Expand Up @@ -555,8 +559,14 @@ func (v *exprToNodeVisitor) Visit(node influxql.Node) influxql.Visitor {
})
return nil

case *influxql.RegexLiteral:
v.nodes = append(v.nodes, &storage.Node{
NodeType: storage.NodeTypeLiteral,
Value: &storage.Node_RegexValue{RegexValue: n.Val.String()},
})
return nil
default:
v.err = errors.New("unsupported expression")
v.err = fmt.Errorf("unsupported expression %T", n)
return nil
}
}
132 changes: 132 additions & 0 deletions internal/cursors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package internal

import "github.com/influxdata/influxdb/tsdb"

var (
_ tsdb.IntegerBatchCursor = NewIntegerBatchCursorMock()
_ tsdb.FloatBatchCursor = NewFloatBatchCursorMock()
_ tsdb.UnsignedBatchCursor = NewUnsignedBatchCursorMock()
_ tsdb.StringBatchCursor = NewStringBatchCursorMock()
_ tsdb.BooleanBatchCursor = NewBooleanBatchCursorMock()
)

// BatchCursorMock provides a mock base implementation for batch cursors.
type BatchCursorMock struct {
CloseFn func()
ErrFn func() error
}

// NewBatchCursorMock returns an initialised BatchCursorMock, which
// returns the zero value for all methods.
func NewBatchCursorMock() *BatchCursorMock {
return &BatchCursorMock{
CloseFn: func() {},
ErrFn: func() error { return nil },
}
}

// Close closes the cursor.
func (c *BatchCursorMock) Close() { c.CloseFn() }

// Err returns the latest error, if any.
func (c *BatchCursorMock) Err() error { return c.ErrFn() }

// IntegerBatchCursorMock provides a mock implementation of an IntegerBatchCursorMock.
type IntegerBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []int64)
}

// NewIntegerBatchCursorMock returns an initialised IntegerBatchCursorMock, which
// returns the zero value for all methods.
func NewIntegerBatchCursorMock() *IntegerBatchCursorMock {
return &IntegerBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []int64) { return nil, nil },
}
}

// Next returns the next set of keys and values.
func (c *IntegerBatchCursorMock) Next() (keys []int64, values []int64) {
return c.NextFn()
}

// FloatBatchCursorMock provides a mock implementation of a FloatBatchCursor.
type FloatBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []float64)
}

// NewFloatBatchCursorMock returns an initialised FloatBatchCursorMock, which
// returns the zero value for all methods.
func NewFloatBatchCursorMock() *FloatBatchCursorMock {
return &FloatBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []float64) { return nil, nil },
}
}

// Next returns the next set of keys and values.
func (c *FloatBatchCursorMock) Next() (keys []int64, values []float64) {
return c.NextFn()
}

// UnsignedBatchCursorMock provides a mock implementation of an UnsignedBatchCursorMock.
type UnsignedBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []uint64)
}

// NewUnsignedBatchCursorMock returns an initialised UnsignedBatchCursorMock, which
// returns the zero value for all methods.
func NewUnsignedBatchCursorMock() *UnsignedBatchCursorMock {
return &UnsignedBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []uint64) { return nil, nil },
}
}

// Next returns the next set of keys and values.
func (c *UnsignedBatchCursorMock) Next() (keys []int64, values []uint64) {
return c.NextFn()
}

// StringBatchCursorMock provides a mock implementation of a StringBatchCursor.
type StringBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []string)
}

// NewStringBatchCursorMock returns an initialised StringBatchCursorMock, which
// returns the zero value for all methods.
func NewStringBatchCursorMock() *StringBatchCursorMock {
return &StringBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []string) { return nil, nil },
}
}

// Next returns the next set of keys and values.
func (c *StringBatchCursorMock) Next() (keys []int64, values []string) {
return c.NextFn()
}

// BooleanBatchCursorMock provides a mock implementation of a BooleanBatchCursor.
type BooleanBatchCursorMock struct {
*BatchCursorMock
NextFn func() (keys []int64, values []bool)
}

// NewBooleanBatchCursorMock returns an initialised BooleanBatchCursorMock, which
// returns the zero value for all methods.
func NewBooleanBatchCursorMock() *BooleanBatchCursorMock {
return &BooleanBatchCursorMock{
BatchCursorMock: NewBatchCursorMock(),
NextFn: func() ([]int64, []bool) { return nil, nil },
}
}

// Next returns the next set of keys and values.
func (c *BooleanBatchCursorMock) Next() (keys []int64, values []bool) {
return c.NextFn()
}
78 changes: 78 additions & 0 deletions internal/storage_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package internal

import (
"context"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)

// TSDBStoreMock is a mockable implementation of storage.Store.
//
// It's currently a partial implementation as one of a store's exported methods
// returns an unexported type.
type StorageStoreMock struct {
ReadFn func(ctx context.Context, req *storage.ReadRequest) (storage.Results, error)
WithLoggerFn func(log *zap.Logger)

ResultSet *StorageResultsMock
// TODO(edd): can't mock GroupRead as it returns an unexported type.
}

// NewStorageStoreMock initialises a StorageStoreMock with methods that return
// their zero values. It also initialises a StorageResultsMock, which can be
// configured via the ResultSet field.
func NewStorageStoreMock() *StorageStoreMock {
store := &StorageStoreMock{
WithLoggerFn: func(*zap.Logger) {},
ResultSet: NewStorageResultsMock(),
}
store.ReadFn = func(context.Context, *storage.ReadRequest) (storage.Results, error) {
return store.ResultSet, nil
}
return store
}

// WithLogger sets the logger.
func (s *StorageStoreMock) WithLogger(log *zap.Logger) {
s.WithLoggerFn(log)
}

// Read reads the storage request and returns a cursor to access results.
func (s *StorageStoreMock) Read(ctx context.Context, req *storage.ReadRequest) (storage.Results, error) {
return s.ReadFn(ctx, req)
}

// StorageResultsMock implements the storage.Results interface providing the
// ability to emit mock results from calls to the StorageStoreMock.Read method.
type StorageResultsMock struct {
CloseFn func()
NextFn func() bool
CursorFn func() tsdb.Cursor
TagsFn func() models.Tags
}

// NewStorageResultsMock initialises a StorageResultsMock whose methods all return
// their zero value.
func NewStorageResultsMock() *StorageResultsMock {
return &StorageResultsMock{
CloseFn: func() {},
NextFn: func() bool { return false },
CursorFn: func() tsdb.Cursor { return nil },
TagsFn: func() models.Tags { return nil },
}
}

// Close closes the result set.
func (r *StorageResultsMock) Close() { r.CloseFn() }

// Next returns true if there are more results available.
func (r *StorageResultsMock) Next() bool { return r.NextFn() }

// Cursor returns the cursor for the result set.
func (r *StorageResultsMock) Cursor() tsdb.Cursor { return r.CursorFn() }

// Tags returns the series' tag set.
func (r *StorageResultsMock) Tags() models.Tags { return r.TagsFn() }
Loading