4 changes: 3 additions & 1 deletion cmd/influx/cli/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ func (q *replQuerier) Query(ctx context.Context, compiler flux.Compiler) (flux.R
return q.client.Query(ctx, req)
}

func getFluxREPL(host string, port int, ssl bool) (*repl.REPL, error) {
func getFluxREPL(host string, port int, ssl bool, username, password string) (*repl.REPL, error) {
c, err := client.NewHTTP(host, port, ssl)
if err != nil {
return nil, err
}
c.Username = username
c.Password = password
return repl.New(&replQuerier{client: c}), nil
}
74 changes: 58 additions & 16 deletions cmd/influx_tools/generate/exec/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,23 +200,27 @@ func (cmd *Command) exec(storagePlan *generate.StoragePlan, spec *gen.Spec) erro
return g.Run(context.Background(), storagePlan.Database, storagePlan.ShardPath(), storagePlan.NodeShardGroups(), gens)
}

const exampleSchema = `title = "CLI schema"
const exampleSchema = `title = "Documented schema"
# limit the maximum number of series generated across all measurements
#
# series-limit: integer, optional (default: unlimited)
# multiple measurements are merged together
[[measurements]]
# name of measurement
# name of measurement
#
# NOTE:
# Multiple definitions of the same measurement name are allowed and
# will be merged together.
name = "cpu"
# sample: float; where 0 < sample ≤ 1.0 (default: 0.5)
# sample a subset of the tag set
#
# sample 25% of the tags
#
# sample = 0.25
sample = 0.25
# Keys for defining a tag
#
Expand Down Expand Up @@ -249,13 +253,16 @@ name = "cpu"
# path: string
# absolute path or relative path to current toml file
tags = [
# example sequence tag source. The range of values are automatically prefixed with 0s
# example sequence tag source. The range of values are automatically
# prefixed with 0s
# to ensure correct sort behavior.
{ name = "host", source = { type = "sequence", format = "host-%s", start = 0, count = 5 } },
{ name = "host", source = { type = "sequence", format = "host-%s", start = 0, count = 5 } },
# tags can also be sourced from a file. The path is relative to the schema.toml.
# Each value must be on a new line. The file is also sorted, validated for UTF-8 and deduplicated.
# { name = "region", source = { type = "file", path = "files/regions.txt" } },
# tags can also be sourced from a file. The path is relative to the
# schema.toml.
# Each value must be on a new line. The file is also sorted, deduplicated
# and UTF-8 validated.
{ name = "rack", source = { type = "file", path = "files/racks.txt" } },
# Example string array source, which is also deduplicated and sorted
{ name = "region", source = ["us-west-01","us-west-02","us-east"] },
Expand All @@ -267,12 +274,47 @@ tags = [
# Name of field
#
# count: int, required
# Number of values to generate. When multiple fields have the same
# count, they will share timestamps.
# The maximum number of values to generate. When multiple fields
# have the same count and time-spec, they will share timestamps.
#
# A time-spec can be either time-precision or time-interval, which
# determines how timestamps are generated and may also influence
# the time range and number of values generated.
#
# time-precision: string [ns, us, ms, s, m, h] (default: ms)
# Specifies the precision (rounding) for generated timestamps.
#
# If the precision results in fewer than "count" intervals for the
# given time range the number of values will be reduced.
#
# Example:
# count = 1000, start = 0s, end = 100s, time-precison = s
# 100 values will be generated at [0s, 1s, 2s, ..., 99s]
#
# If the precision results in greater than "count" intervals for the
# given time range, the interval will be rounded to the nearest multiple of
# time-precision.
#
# Example:
# count = 10, start = 0s, end = 100s, time-precison = s
# 100 values will be generated at [0s, 10s, 20s, ..., 90s]
#
# time-interval: Go duration string (eg 90s, 1h30m)
# Specifies the delta between generated timestamps.
#
# If the delta results in fewer than "count" intervals for the
# given time range the number of values will be reduced.
#
# Example:
# count = 100, start = 0s, end = 100s, time-interval = 10s
# 10 values will be generated at [0s, 10s, 20s, ..., 90s]
#
# If the delta results in greater than "count" intervals for the
# given time range, the start-time will be adjusted to ensure "count" values.
#
# time-precision: string (default: ms)
# The precision for generated timestamps.
# One of ns, us, ms, s, m, h
# Example:
# count = 20, start = 0s, end = 1000s, time-interval = 10s
# 20 values will be generated at [800s, 810s, ..., 900s, ..., 990s]
#
# source: int, float, boolean, string, array or object
#
Expand Down Expand Up @@ -321,8 +363,8 @@ tags = [
]
fields = [
# An example of a sequence of integer values
{ name = "free", count = 17, source = [10,15,20,25,30,35,30], time-precision = "ms" },
{ name = "low_mem", count = 17, source = [false,true,true], time-precision = "ms" },
{ name = "free", count = 100, source = [10,15,20,25,30,35,30], time-precision = "ms" },
{ name = "low_mem", count = 100, source = [false,true,true], time-precision = "ms" },
]
`

Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/services/subscriber"
"github.com/influxdata/influxdb/services/udp"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/tcp"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/platform/storage/reads"
client "github.com/influxdata/usage-client/v1"
"go.uber.org/zap"

Expand Down
2 changes: 1 addition & 1 deletion cmd/store/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads/datatypes"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down
8 changes: 2 additions & 6 deletions flux/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ package builtin

import (
"github.com/influxdata/flux"

_ "github.com/influxdata/flux/functions/inputs" // Import the built-in input functions
_ "github.com/influxdata/flux/functions/outputs" // Import the built-in output functions
_ "github.com/influxdata/flux/functions/transformations" // Import the built-in transformations
_ "github.com/influxdata/flux/options" // Import the built-in options
_ "github.com/influxdata/influxdb/flux/functions/inputs" // Import the built-in functions
_ "github.com/influxdata/flux/stdlib"
_ "github.com/influxdata/influxdb/flux/stdlib"
)

func init() {
Expand Down
5 changes: 5 additions & 0 deletions flux/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
// API endpoint.
type HTTP struct {
Addr string
Username string
Password string
InsecureSkipVerify bool
url *url.URL
}
Expand Down Expand Up @@ -66,6 +68,9 @@ func (s *HTTP) Query(ctx context.Context, r *ProxyRequest) (flux.ResultIterator,
if err != nil {
return nil, err
}
if s.Username != "" {
hreq.SetBasicAuth(s.Username, s.Password)
}

hreq.Header.Set("Content-Type", "application/json")
hreq.Header.Set("Accept", "text/csv")
Expand Down
34 changes: 25 additions & 9 deletions flux/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package control
import (
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/influxdb/coordinator"
_ "github.com/influxdata/influxdb/flux/builtin"
"github.com/influxdata/influxdb/flux/functions/inputs"
fstorage "github.com/influxdata/platform/query/functions/inputs/storage"
"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
v1 "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb/v1"
"go.uber.org/zap"
)

type MetaClient = inputs.MetaClient
type Authorizer = inputs.Authorizer
type MetaClient = coordinator.MetaClient
type Authorizer = influxdb.Authorizer

func NewController(mc MetaClient, reader fstorage.Reader, auth Authorizer, authEnabled bool, logger *zap.Logger) *control.Controller {
func NewController(mc MetaClient, reader influxdb.Reader, auth Authorizer, authEnabled bool, logger *zap.Logger) *control.Controller {
// flux
var (
concurrencyQuota = 10
Expand All @@ -26,13 +27,28 @@ func NewController(mc MetaClient, reader fstorage.Reader, auth Authorizer, authE
Logger: logger,
}

err := inputs.InjectFromDependencies(cc.ExecutorDependencies, inputs.Dependencies{Reader: reader, MetaClient: mc, Authorizer: auth, AuthEnabled: authEnabled})
if err != nil {
if err := influxdb.InjectFromDependencies(cc.ExecutorDependencies, influxdb.Dependencies{
Reader: reader,
MetaClient: mc,
Authorizer: auth,
AuthEnabled: authEnabled,
}); err != nil {
panic(err)
}

err = inputs.InjectBucketDependencies(cc.ExecutorDependencies, inputs.BucketDependencies{MetaClient: mc, Authorizer: auth, AuthEnabled: authEnabled})
if err != nil {
if err := v1.InjectDatabaseDependencies(cc.ExecutorDependencies, v1.DatabaseDependencies{
MetaClient: mc,
Authorizer: auth,
AuthEnabled: authEnabled,
}); err != nil {
panic(err)
}

if err := influxdb.InjectBucketDependencies(cc.ExecutorDependencies, influxdb.BucketDependencies{
MetaClient: mc,
Authorizer: auth,
AuthEnabled: authEnabled,
}); err != nil {
panic(err)
}

Expand Down
127 changes: 0 additions & 127 deletions flux/functions/inputs/from.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package inputs
package influxdb

import (
"errors"
"fmt"

"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/functions/inputs"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
"github.com/pkg/errors"
)

func init() {
execute.RegisterSource(inputs.BucketsKind, createBucketsSource)
execute.RegisterSource(influxdb.BucketsKind, createBucketsSource)
}

type BucketsDecoder struct {
Expand All @@ -34,35 +34,35 @@ func (bd *BucketsDecoder) Fetch() (bool, error) {

func (bd *BucketsDecoder) Decode() (flux.Table, error) {
kb := execute.NewGroupKeyBuilder(nil)
kb.AddKeyValue("organizationID", values.NewString("influxdb"))
kb.AddKeyValue("organizationID", values.NewString(""))
gk, err := kb.Build()
if err != nil {
return nil, err
}

b := execute.NewColListTableBuilder(gk, bd.alloc)

b.AddCol(flux.ColMeta{
_, _ = b.AddCol(flux.ColMeta{
Label: "name",
Type: flux.TString,
})
b.AddCol(flux.ColMeta{
_, _ = b.AddCol(flux.ColMeta{
Label: "id",
Type: flux.TString,
})
b.AddCol(flux.ColMeta{
_, _ = b.AddCol(flux.ColMeta{
Label: "organization",
Type: flux.TString,
})
b.AddCol(flux.ColMeta{
_, _ = b.AddCol(flux.ColMeta{
Label: "organizationID",
Type: flux.TString,
})
b.AddCol(flux.ColMeta{
_, _ = b.AddCol(flux.ColMeta{
Label: "retentionPolicy",
Type: flux.TString,
})
b.AddCol(flux.ColMeta{
_, _ = b.AddCol(flux.ColMeta{
Label: "retentionPeriod",
Type: flux.TInt,
})
Expand All @@ -79,30 +79,35 @@ func (bd *BucketsDecoder) Decode() (flux.Table, error) {
}
}

for _, bucket := range bd.deps.MetaClient.Databases() {
if hasAccess(bucket.Name) {
rp := bucket.RetentionPolicy(bucket.DefaultRetentionPolicy)
b.AppendString(0, bucket.Name)
b.AppendString(1, "")
b.AppendString(2, "influxdb")
b.AppendString(3, "")
b.AppendString(4, rp.Name)
b.AppendInt(5, rp.Duration.Nanoseconds())
for _, db := range bd.deps.MetaClient.Databases() {
if hasAccess(db.Name) {
for _, rp := range db.RetentionPolicies {
_ = b.AppendString(0, db.Name+"/"+rp.Name)
_ = b.AppendString(1, "")
_ = b.AppendString(2, "influxdb")
_ = b.AppendString(3, "")
_ = b.AppendString(4, rp.Name)
_ = b.AppendInt(5, rp.Duration.Nanoseconds())
}
}
}

return b.Table()
}

func (bd *BucketsDecoder) Close() error {
return nil
}

func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
_, ok := prSpec.(*inputs.BucketsProcedureSpec)
_, ok := prSpec.(*influxdb.BucketsProcedureSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", prSpec)
}

// the dependencies used for FromKind are adequate for what we need here
// so there's no need to inject custom dependencies for buckets()
deps := a.Dependencies()[inputs.BucketsKind].(BucketDependencies)
deps := a.Dependencies()[influxdb.BucketsKind].(BucketDependencies)

var user meta.User
if deps.AuthEnabled {
Expand All @@ -114,7 +119,7 @@ func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex

bd := &BucketsDecoder{deps: deps, alloc: a.Allocator(), user: user}

return inputs.CreateSourceFromDecoder(bd, dsid, a)
return execute.CreateSourceFromDecoder(bd, dsid, a)

}

Expand Down Expand Up @@ -143,6 +148,6 @@ func InjectBucketDependencies(depsMap execute.Dependencies, deps BucketDependenc
if err := deps.Validate(); err != nil {
return err
}
depsMap[inputs.BucketsKind] = deps
depsMap[influxdb.BucketsKind] = deps
return nil
}
744 changes: 744 additions & 0 deletions flux/stdlib/influxdata/influxdb/from.go

Large diffs are not rendered by default.

176 changes: 176 additions & 0 deletions flux/stdlib/influxdata/influxdb/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package influxdb

import (
"context"
"fmt"
"log"
"math"

"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/semantic"
)

// source performs storage reads
type source struct {
id execute.DatasetID
reader Reader
readSpec ReadSpec
window execute.Window
bounds execute.Bounds
alloc *memory.Allocator

ts []execute.Transformation

currentTime execute.Time
overflow bool
}

func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time, alloc *memory.Allocator) execute.Source {
return &source{
id: id,
reader: r,
readSpec: readSpec,
bounds: bounds,
window: w,
currentTime: currentTime,
alloc: alloc,
}
}

func (s *source) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}

func (s *source) Run(ctx context.Context) {
err := s.run(ctx)
for _, t := range s.ts {
t.Finish(s.id, err)
}
}

func (s *source) run(ctx context.Context) error {
//TODO(nathanielc): Pass through context to actual network I/O.
for tables, mark, ok := s.next(ctx); ok; tables, mark, ok = s.next(ctx) {
err := tables.Do(func(tbl flux.Table) error {
for _, t := range s.ts {
if err := t.Process(s.id, tbl); err != nil {
return err
}
//TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving.
// This is probably not needed for this source, but other sources should do so.
if err := t.UpdateProcessingTime(s.id, execute.Now()); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}

for _, t := range s.ts {
if err := t.UpdateWatermark(s.id, mark); err != nil {
return err
}
}
}
return nil
}

func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bool) {
if s.overflow {
return nil, 0, false
}

start := s.currentTime - execute.Time(s.window.Period)
stop := s.currentTime
if stop > s.bounds.Stop {
return nil, 0, false
}

// Check if we will overflow, if so we are done after this pass
every := execute.Time(s.window.Every)
if every > 0 {
s.overflow = s.currentTime > math.MaxInt64-every
} else {
s.overflow = s.currentTime < math.MinInt64-every
}
s.currentTime = s.currentTime + every

bi, err := s.reader.Read(
ctx,
s.readSpec,
start,
stop,
s.alloc,
)
if err != nil {
log.Println("E!", err)
return nil, 0, false
}
return bi, stop, true
}

type GroupMode int

const (
// GroupModeDefault specifies the default grouping mode, which is GroupModeAll.
GroupModeDefault GroupMode = 0
// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = 1 << iota
// GroupModeAll produces a separate table for each series.
GroupModeAll
// GroupModeBy produces a table for each unique value of the specified GroupKeys.
GroupModeBy
// GroupModeExcept produces a table for the unique values of all keys, except those specified by GroupKeys.
GroupModeExcept
)

// ToGroupMode accepts the group mode from Flux and produces the appropriate storage group mode.
func ToGroupMode(fluxMode flux.GroupMode) GroupMode {
switch fluxMode {
case flux.GroupModeNone:
return GroupModeDefault
case flux.GroupModeBy:
return GroupModeBy
case flux.GroupModeExcept:
return GroupModeExcept
default:
panic(fmt.Sprint("unknown group mode: ", fluxMode))
}
}

type ReadSpec struct {
Database string
RetentionPolicy string

RAMLimit uint64
Hosts []string
Predicate *semantic.FunctionExpression
PointsLimit int64
SeriesLimit int64
SeriesOffset int64
Descending bool

AggregateMethod string

// OrderByTime indicates that series reads should produce all
// series for a time before producing any series for a larger time.
// By default this is false meaning all values of time are produced for a given series,
// before any values are produced from the next series.
OrderByTime bool
// GroupMode instructs
GroupMode GroupMode
// GroupKeys is the list of dimensions along which to group.
//
// When GroupMode is GroupModeBy, the results will be grouped by the specified keys.
// When GroupMode is GroupModeExcept, the results will be grouped by all keys, except those specified.
GroupKeys []string
}

type Reader interface {
Read(ctx context.Context, rs ReadSpec, start, stop execute.Time, alloc *memory.Allocator) (flux.TableIterator, error)
Close()
}
199 changes: 199 additions & 0 deletions flux/stdlib/influxdata/influxdb/v1/databases.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package v1

import (
"context"
"errors"
"fmt"

"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
v1 "github.com/influxdata/flux/stdlib/influxdata/influxdb/v1"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
)

const DatabasesKind = v1.DatabasesKind

type DatabasesOpSpec struct {
}

func init() {
flux.ReplacePackageValue("influxdata/influxdb/v1", DatabasesKind, flux.FunctionValue(DatabasesKind, createDatabasesOpSpec, v1.DatabasesSignature))
flux.RegisterOpSpec(DatabasesKind, newDatabasesOp)
plan.RegisterProcedureSpec(DatabasesKind, newDatabasesProcedure, DatabasesKind)
}

func createDatabasesOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
spec := new(DatabasesOpSpec)
return spec, nil
}

func newDatabasesOp() flux.OperationSpec {
return new(DatabasesOpSpec)
}

func (s *DatabasesOpSpec) Kind() flux.OperationKind {
return DatabasesKind
}

type DatabasesProcedureSpec struct {
plan.DefaultCost
}

func newDatabasesProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
_, ok := qs.(*DatabasesOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}

return &DatabasesProcedureSpec{}, nil
}

func (s *DatabasesProcedureSpec) Kind() plan.ProcedureKind {
return DatabasesKind
}

func (s *DatabasesProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(DatabasesProcedureSpec)
return ns
}

func init() {
execute.RegisterSource(DatabasesKind, createDatabasesSource)
}

type DatabasesDecoder struct {
deps *DatabaseDependencies
databases []meta.DatabaseInfo
user meta.User
alloc *memory.Allocator
ctx context.Context
}

func (bd *DatabasesDecoder) Connect() error {
return nil
}

func (bd *DatabasesDecoder) Fetch() (bool, error) {
bd.databases = bd.deps.MetaClient.Databases()
return false, nil
}

func (bd *DatabasesDecoder) Decode() (flux.Table, error) {
kb := execute.NewGroupKeyBuilder(nil)
kb.AddKeyValue("organizationID", values.NewString(""))
gk, err := kb.Build()
if err != nil {
return nil, err
}

b := execute.NewColListTableBuilder(gk, bd.alloc)

if _, err := b.AddCol(flux.ColMeta{
Label: "organizationID",
Type: flux.TString,
}); err != nil {
return nil, err
}
if _, err := b.AddCol(flux.ColMeta{
Label: "databaseName",
Type: flux.TString,
}); err != nil {
return nil, err
}
if _, err := b.AddCol(flux.ColMeta{
Label: "retentionPolicy",
Type: flux.TString,
}); err != nil {
return nil, err
}
if _, err := b.AddCol(flux.ColMeta{
Label: "retentionPeriod",
Type: flux.TInt,
}); err != nil {
return nil, err
}
if _, err := b.AddCol(flux.ColMeta{
Label: "default",
Type: flux.TBool,
}); err != nil {
return nil, err
}
if _, err := b.AddCol(flux.ColMeta{
Label: "bucketId",
Type: flux.TString,
}); err != nil {
return nil, err
}

var hasAccess func(db string) bool
if bd.user == nil {
hasAccess = func(db string) bool {
return true
}
} else {
hasAccess = func(db string) bool {
return bd.deps.Authorizer.AuthorizeDatabase(bd.user, influxql.ReadPrivilege, db) == nil ||
bd.deps.Authorizer.AuthorizeDatabase(bd.user, influxql.WritePrivilege, db) == nil
}
}

for _, db := range bd.databases {
if hasAccess(db.Name) {
for _, rp := range db.RetentionPolicies {
_ = b.AppendString(0, "")
_ = b.AppendString(1, db.Name)
_ = b.AppendString(2, rp.Name)
_ = b.AppendInt(3, rp.Duration.Nanoseconds())
_ = b.AppendBool(4, db.DefaultRetentionPolicy == rp.Name)
_ = b.AppendString(5, "")
}
}
}

return b.Table()
}

func (bd *DatabasesDecoder) Close() error {
return nil
}

func createDatabasesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
_, ok := prSpec.(*DatabasesProcedureSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", prSpec)
}

deps := a.Dependencies()[DatabasesKind].(DatabaseDependencies)
var user meta.User
if deps.AuthEnabled {
user = meta.UserFromContext(a.Context())
if user == nil {
return nil, errors.New("createDatabasesSource: no user")
}
}
bd := &DatabasesDecoder{deps: &deps, alloc: a.Allocator(), ctx: a.Context(), user: user}
return execute.CreateSourceFromDecoder(bd, dsid, a)
}

type DatabaseDependencies struct {
MetaClient coordinator.MetaClient
Authorizer influxdb.Authorizer
AuthEnabled bool
}

func InjectDatabaseDependencies(depsMap execute.Dependencies, deps DatabaseDependencies) error {
if deps.MetaClient == nil {
return errors.New("missing meta client dependency")
}
if deps.AuthEnabled && deps.Authorizer == nil {
return errors.New("missing authorizer with auth enabled")
}
depsMap[DatabasesKind] = deps
return nil
}
7 changes: 7 additions & 0 deletions flux/stdlib/packages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package stdlib

// Import all stdlib packages
import (
_ "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
_ "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb/v1"
)
4 changes: 2 additions & 2 deletions internal/storage_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"io"
"time"

"github.com/jsternberg/zap-logfmt"
zaplogfmt "github.com/jsternberg/zap-logfmt"
isatty "github.com/mattn/go-isatty"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down
187 changes: 187 additions & 0 deletions mock/storage_reads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package mock

import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata"
)

type ResponseStream struct {
SendFunc func(*datatypes.ReadResponse) error
SetTrailerFunc func(metadata.MD)
}

func NewResponseStream() *ResponseStream {
return &ResponseStream{
SendFunc: func(*datatypes.ReadResponse) error { return nil },
SetTrailerFunc: func(mds metadata.MD) {},
}
}

func (s *ResponseStream) Send(r *datatypes.ReadResponse) error {
return s.SendFunc(r)
}

func (s *ResponseStream) SetTrailer(m metadata.MD) {
s.SetTrailerFunc(m)
}

type ResultSet struct {
NextFunc func() bool
CursorFunc func() cursors.Cursor
TagsFunc func() models.Tags
CloseFunc func()
ErrFunc func() error
StatsFunc func() cursors.CursorStats
}

func NewResultSet() *ResultSet {
return &ResultSet{
NextFunc: func() bool { return false },
CursorFunc: func() cursors.Cursor { return nil },
TagsFunc: func() models.Tags { return nil },
CloseFunc: func() {},
ErrFunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
}
}

func (rs *ResultSet) Next() bool {
return rs.NextFunc()
}

func (rs *ResultSet) Cursor() cursors.Cursor {
return rs.CursorFunc()
}

func (rs *ResultSet) Tags() models.Tags {
return rs.TagsFunc()
}

func (rs *ResultSet) Close() {
rs.CloseFunc()
}

func (rs *ResultSet) Err() error {
return rs.ErrFunc()
}

func (rs *ResultSet) Stats() cursors.CursorStats {
return rs.StatsFunc()
}

type GroupResultSet struct {
NextFunc func() reads.GroupCursor
CloseFunc func()
ErrFunc func() error
}

func NewGroupResultSet() *GroupResultSet {
return &GroupResultSet{
NextFunc: func() reads.GroupCursor { return nil },
CloseFunc: func() {},
ErrFunc: func() error { return nil },
}
}

func (rs *GroupResultSet) Next() reads.GroupCursor {
return rs.NextFunc()
}

func (rs *GroupResultSet) Close() {
rs.CloseFunc()
}

func (rs *GroupResultSet) Err() error {
return rs.ErrFunc()
}

type IntegerArrayCursor struct {
CloseFunc func()
Errfunc func() error
StatsFunc func() cursors.CursorStats
NextFunc func() *cursors.IntegerArray
}

func NewIntegerArrayCursor() *IntegerArrayCursor {
return &IntegerArrayCursor{
CloseFunc: func() {},
Errfunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
NextFunc: func() *cursors.IntegerArray { return &cursors.IntegerArray{} },
}
}

func (c *IntegerArrayCursor) Close() {
c.CloseFunc()
}

func (c *IntegerArrayCursor) Err() error {
return c.Errfunc()
}

func (c *IntegerArrayCursor) Stats() cursors.CursorStats {
return c.StatsFunc()
}

func (c *IntegerArrayCursor) Next() *cursors.IntegerArray {
return c.NextFunc()
}

type GroupCursor struct {
NextFunc func() bool
CursorFunc func() cursors.Cursor
TagsFunc func() models.Tags
KeysFunc func() [][]byte
PartitionKeyValsFunc func() [][]byte
CloseFunc func()
ErrFunc func() error
StatsFunc func() cursors.CursorStats
}

func NewGroupCursor() *GroupCursor {
return &GroupCursor{
NextFunc: func() bool { return false },
CursorFunc: func() cursors.Cursor { return nil },
TagsFunc: func() models.Tags { return nil },
KeysFunc: func() [][]byte { return nil },
PartitionKeyValsFunc: func() [][]byte { return nil },
CloseFunc: func() {},
ErrFunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
}
}

func (c *GroupCursor) Next() bool {
return c.NextFunc()
}

func (c *GroupCursor) Cursor() cursors.Cursor {
return c.CursorFunc()
}

func (c *GroupCursor) Tags() models.Tags {
return c.TagsFunc()
}

func (c *GroupCursor) Keys() [][]byte {
return c.KeysFunc()
}

func (c *GroupCursor) PartitionKeyVals() [][]byte {
return c.PartitionKeyValsFunc()
}

func (c *GroupCursor) Close() {
c.CloseFunc()
}

func (c *GroupCursor) Err() error {
return c.ErrFunc()
}

func (c *GroupCursor) Stats() cursors.CursorStats {
return c.StatsFunc()
}
279 changes: 276 additions & 3 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ import (
"unicode/utf8"

"github.com/influxdata/influxdb/pkg/escape"
"github.com/influxdata/platform/models"
)

// Values used to store the field key and measurement name as special internal
// tags.
const (
FieldKeyTagKey = "\xff"
MeasurementTagKey = "\x00"
)

// Predefined byte representations of special tag keys.
var (
FieldKeyTagKeyBytes = []byte(FieldKeyTagKey)
MeasurementTagKeyBytes = []byte(MeasurementTagKey)
)

type escapeSet struct {
Expand Down Expand Up @@ -1882,7 +1894,10 @@ func (p *point) Split(size int) []Point {
}

// Tag represents a single key/value tag pair.
type Tag = models.Tag
type Tag struct {
Key []byte
Value []byte
}

// NewTag returns a new Tag.
func NewTag(key, value []byte) Tag {
Expand All @@ -1892,8 +1907,38 @@ func NewTag(key, value []byte) Tag {
}
}

// Size returns the size of the key and value.
func (t Tag) Size() int { return len(t.Key) + len(t.Value) }

// Clone returns a shallow copy of Tag.
//
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
func (t Tag) Clone() Tag {
other := Tag{
Key: make([]byte, len(t.Key)),
Value: make([]byte, len(t.Value)),
}

copy(other.Key, t.Key)
copy(other.Value, t.Value)

return other
}

// String returns the string reprsentation of the tag.
func (t *Tag) String() string {
var buf bytes.Buffer
buf.WriteByte('{')
buf.WriteString(string(t.Key))
buf.WriteByte(' ')
buf.WriteString(string(t.Value))
buf.WriteByte('}')
return buf.String()
}

// Tags represents a sorted list of tags.
type Tags = models.Tags
type Tags []Tag

// NewTags returns a new Tags from a map.
func NewTags(m map[string]string) Tags {
Expand All @@ -1908,6 +1953,89 @@ func NewTags(m map[string]string) Tags {
return a
}

// Keys returns the list of keys for a tag set.
func (a Tags) Keys() []string {
if len(a) == 0 {
return nil
}
keys := make([]string, len(a))
for i, tag := range a {
keys[i] = string(tag.Key)
}
return keys
}

// Values returns the list of values for a tag set.
func (a Tags) Values() []string {
if len(a) == 0 {
return nil
}
values := make([]string, len(a))
for i, tag := range a {
values[i] = string(tag.Value)
}
return values
}

// String returns the string representation of the tags.
func (a Tags) String() string {
var buf bytes.Buffer
buf.WriteByte('[')
for i := range a {
buf.WriteString(a[i].String())
if i < len(a)-1 {
buf.WriteByte(' ')
}
}
buf.WriteByte(']')
return buf.String()
}

// Size returns the number of bytes needed to store all tags. Note, this is
// the number of bytes needed to store all keys and values and does not account
// for data structures or delimiters for example.
func (a Tags) Size() int {
var total int
for i := range a {
total += a[i].Size()
}
return total
}

// Clone returns a copy of the slice where the elements are a result of calling `Clone` on the original elements
//
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
func (a Tags) Clone() Tags {
if len(a) == 0 {
return nil
}

others := make(Tags, len(a))
for i := range a {
others[i] = a[i].Clone()
}

return others
}

func (a Tags) Len() int { return len(a) }
func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// Equal returns true if a equals other.
func (a Tags) Equal(other Tags) bool {
if len(a) != len(other) {
return false
}
for i := range a {
if !bytes.Equal(a[i].Key, other[i].Key) || !bytes.Equal(a[i].Value, other[i].Value) {
return false
}
}
return true
}

// CompareTags returns -1 if a < b, 1 if a > b, and 0 if a == b.
func CompareTags(a, b Tags) int {
// Compare each key & value until a mismatch.
Expand All @@ -1931,6 +2059,151 @@ func CompareTags(a, b Tags) int {
return 0
}

// Get returns the value for a key.
func (a Tags) Get(key []byte) []byte {
// OPTIMIZE: Use sort.Search if tagset is large.

for _, t := range a {
if bytes.Equal(t.Key, key) {
return t.Value
}
}
return nil
}

// GetString returns the string value for a string key.
func (a Tags) GetString(key string) string {
return string(a.Get([]byte(key)))
}

// Set sets the value for a key.
func (a *Tags) Set(key, value []byte) {
for i, t := range *a {
if bytes.Equal(t.Key, key) {
(*a)[i].Value = value
return
}
}
*a = append(*a, Tag{Key: key, Value: value})
sort.Sort(*a)
}

// SetString sets the string value for a string key.
func (a *Tags) SetString(key, value string) {
a.Set([]byte(key), []byte(value))
}

// Delete removes a tag by key.
func (a *Tags) Delete(key []byte) {
for i, t := range *a {
if bytes.Equal(t.Key, key) {
copy((*a)[i:], (*a)[i+1:])
(*a)[len(*a)-1] = Tag{}
*a = (*a)[:len(*a)-1]
return
}
}
}

// Map returns a map representation of the tags.
func (a Tags) Map() map[string]string {
m := make(map[string]string, len(a))
for _, t := range a {
m[string(t.Key)] = string(t.Value)
}
return m
}

// Merge merges the tags combining the two. If both define a tag with the
// same key, the merged value overwrites the old value.
// A new map is returned.
func (a Tags) Merge(other map[string]string) Tags {
merged := make(map[string]string, len(a)+len(other))
for _, t := range a {
merged[string(t.Key)] = string(t.Value)
}
for k, v := range other {
merged[k] = v
}
return NewTags(merged)
}

// HashKey hashes all of a tag's keys.
func (a Tags) HashKey() []byte {
return a.AppendHashKey(nil)
}

func (a Tags) needsEscape() bool {
for i := range a {
t := &a[i]
for j := range tagEscapeCodes {
c := &tagEscapeCodes[j]
if bytes.IndexByte(t.Key, c.k[0]) != -1 || bytes.IndexByte(t.Value, c.k[0]) != -1 {
return true
}
}
}
return false
}

// AppendHashKey appends the result of hashing all of a tag's keys and values to dst and returns the extended buffer.
func (a Tags) AppendHashKey(dst []byte) []byte {
// Empty maps marshal to empty bytes.
if len(a) == 0 {
return dst
}

// Type invariant: Tags are sorted

sz := 0
var escaped Tags
if a.needsEscape() {
var tmp [20]Tag
if len(a) < len(tmp) {
escaped = tmp[:len(a)]
} else {
escaped = make(Tags, len(a))
}

for i := range a {
t := &a[i]
nt := &escaped[i]
nt.Key = escapeTag(t.Key)
nt.Value = escapeTag(t.Value)
sz += len(nt.Key) + len(nt.Value)
}
} else {
sz = a.Size()
escaped = a
}

sz += len(escaped) + (len(escaped) * 2) // separators

// Generate marshaled bytes.
if cap(dst)-len(dst) < sz {
nd := make([]byte, len(dst), len(dst)+sz)
copy(nd, dst)
dst = nd
}
buf := dst[len(dst) : len(dst)+sz]
idx := 0
for i := range escaped {
k := &escaped[i]
if len(k.Value) == 0 {
continue
}
buf[idx] = ','
idx++
copy(buf[idx:], k.Key)
idx += len(k.Key)
buf[idx] = '='
idx++
copy(buf[idx:], k.Value)
idx += len(k.Value)
}
return dst[:len(dst)+idx]
}

// CopyTags returns a shallow copy of tags.
func CopyTags(a Tags) Tags {
other := make(Tags, len(a))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
type mergedSeriesGenerator struct {
heap seriesGeneratorHeap
last constSeries
err error
n int64
first bool
}
Expand Down
File renamed without changes.
23 changes: 21 additions & 2 deletions pkg/data/gen/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,29 @@ type FieldSource interface {
type Field struct {
Name string
Count int64
TimePrecision precision `toml:"time-precision"` // TimePrecision determines the precision for generated timestamp values
TimePrecision *precision `toml:"time-precision"` // TimePrecision determines the precision for generated timestamp values
TimeInterval *duration `toml:"time-interval"` // TimeInterval determines the duration between timestamp values
Source FieldSource
}

func (t *Field) TimeSequenceSpec() TimeSequenceSpec {
if t.TimeInterval != nil {
return TimeSequenceSpec{
Count: int(t.Count),
Delta: t.TimeInterval.Duration,
}
}

if t.TimePrecision != nil {
return TimeSequenceSpec{
Count: int(t.Count),
Precision: t.TimePrecision.ToDuration(),
}
}

panic("TimeInterval and TimePrecision are nil")
}

func (*Field) node() {}

type FieldConstantValue struct {
Expand Down Expand Up @@ -200,7 +219,7 @@ func walk(v Visitor, node SchemaNode, up bool) Visitor {
case *Measurement:
v := v
v = walk(v, n.Tags, up)
v = walk(v, n.Fields, up)
walk(v, n.Fields, up)

case Fields:
v := v
Expand Down
12 changes: 12 additions & 0 deletions pkg/data/gen/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,15 @@ func (s *StringArraySequence) Value() string {
func (s *StringArraySequence) Count() int {
return len(s.vals)
}

type StringConstantSequence struct {
val string
}

func NewStringConstantSequence(val string) *StringConstantSequence {
return &StringConstantSequence{val: val}
}

func (s *StringConstantSequence) Next() bool { return true }
func (s *StringConstantSequence) Value() string { return s.val }
func (s *StringConstantSequence) Count() int { return 1 }
8 changes: 4 additions & 4 deletions pkg/data/gen/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
)

type Series interface {
type seriesKeyField interface {
// Key returns the series key.
// The returned value may be cached.
Key() []byte
Expand All @@ -22,13 +22,13 @@ type constSeries struct {
func (s *constSeries) Key() []byte { return s.key }
func (s *constSeries) Field() []byte { return s.field }

var nilSeries Series = &constSeries{}
var nilSeries seriesKeyField = &constSeries{}

// Compare returns an integer comparing two SeriesGenerator instances
// lexicographically.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
// A nil argument is equivalent to an empty SeriesGenerator.
func CompareSeries(a, b Series) int {
func CompareSeries(a, b seriesKeyField) int {
if a == nil {
a = nilSeries
}
Expand All @@ -44,7 +44,7 @@ func CompareSeries(a, b Series) int {
}
}

func (s *constSeries) CopyFrom(a Series) {
func (s *constSeries) CopyFrom(a seriesKeyField) {
key := a.Key()
if cap(s.key) < len(key) {
s.key = make([]byte, len(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/platform/pkg/data/gen"
)

type SeriesGenerator interface {
Expand Down Expand Up @@ -33,7 +32,7 @@ type SeriesGenerator interface {
}

type TimeSequenceSpec struct {
// Count specifies the number of values to generate.
// Count specifies the maximum number of values to generate.
Count int

// Start specifies the starting time for the values.
Expand All @@ -46,11 +45,52 @@ type TimeSequenceSpec struct {
Precision time.Duration
}

func (ts TimeSequenceSpec) ForTimeRange(tr TimeRange) TimeSequenceSpec {
// Truncate time range
if ts.Delta > 0 {
tr = tr.Truncate(ts.Delta)
} else {
tr = tr.Truncate(ts.Precision)
}

ts.Start = tr.Start

if ts.Delta > 0 {
intervals := int(tr.End.Sub(tr.Start) / ts.Delta)
if intervals > ts.Count {
// if the number of intervals in the specified time range exceeds
// the maximum count, move the start forward to limit the number of values
ts.Start = tr.End.Add(-time.Duration(ts.Count) * ts.Delta)
} else {
ts.Count = intervals
}
} else {
ts.Delta = tr.End.Sub(tr.Start) / time.Duration(ts.Count)
if ts.Delta < ts.Precision {
// count is too high for the range of time and precision
ts.Count = int(tr.End.Sub(tr.Start) / ts.Precision)
ts.Delta = ts.Precision
} else {
ts.Delta = ts.Delta.Round(ts.Precision)
}
ts.Precision = 0
}

return ts
}

type TimeRange struct {
Start time.Time
End time.Time
}

func (t TimeRange) Truncate(d time.Duration) TimeRange {
return TimeRange{
Start: t.Start.Truncate(d),
End: t.End.Truncate(d),
}
}

type TimeValuesSequence interface {
Reset()
Next() bool
Expand All @@ -70,7 +110,7 @@ type cache struct {

type seriesGenerator struct {
name []byte
tags gen.TagsSequence
tags TagsSequence
field []byte
vg TimeValuesSequence
n int64
Expand Down
6 changes: 3 additions & 3 deletions pkg/data/gen/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
)

func TestCompareSeries(t *testing.T) {
mk := func(k, f string) Series {
mk := func(k, f string) seriesKeyField {
return &constSeries{key: []byte(k), field: []byte(f)}
}

tests := []struct {
name string
a Series
b Series
a seriesKeyField
b seriesKeyField
exp int
}{
{
Expand Down
13 changes: 2 additions & 11 deletions pkg/data/gen/specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"path"
"path/filepath"
"sort"
"time"
"unicode/utf8"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -96,12 +95,7 @@ type FieldValuesSpec struct {
}

func newTimeValuesSequenceFromFieldValuesSpec(fs *FieldValuesSpec, tr TimeRange) TimeValuesSequence {
ts := fs.TimeSequenceSpec
ts.Start = tr.Start
ts.Delta = tr.End.Sub(tr.Start) / time.Duration(ts.Count)
ts.Delta = ts.Delta.Round(ts.Precision)

return fs.Values(ts)
return fs.Values(fs.TimeSequenceSpec.ForTimeRange(tr))
}

func NewSpecFromToml(s string) (*Spec, error) {
Expand Down Expand Up @@ -311,10 +305,7 @@ func (s *schemaToSpec) visit(node SchemaNode) bool {
panic(fmt.Sprintf("unexpected type %T", fs))
}

fs.TimeSequenceSpec = TimeSequenceSpec{
Count: int(n.Count),
Precision: n.TimePrecision.ToDuration(),
}
fs.TimeSequenceSpec = n.TimeSequenceSpec()
fs.Name = n.Name

case *FieldConstantValue:
Expand Down
53 changes: 52 additions & 1 deletion pkg/data/gen/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,41 @@ func (s *sample) UnmarshalTOML(data interface{}) error {
return nil
}

type duration struct {
time.Duration
}

func (d *duration) UnmarshalTOML(data interface{}) error {
text, ok := data.(string)
if !ok {
return fmt.Errorf("invalid duration, expect a Go duration as a string: %T", data)
}

return d.UnmarshalText([]byte(text))
}

func (d *duration) UnmarshalText(text []byte) error {
s := string(text)

var err error
d.Duration, err = time.ParseDuration(s)
if err != nil {
return err
}

if d.Duration == 0 {
d.Duration, err = time.ParseDuration("1" + s)
if err != nil {
return err
}
}

if d.Duration <= 0 {
return fmt.Errorf("invalid duration, must be > 0: %s", d.Duration)
}
return nil
}

type precision byte

const (
Expand Down Expand Up @@ -249,9 +284,25 @@ func (t *Field) UnmarshalTOML(data interface{}) error {
}

if n, ok := d["time-precision"]; ok {
if err := t.TimePrecision.UnmarshalTOML(n); err != nil {
var tp precision
if err := tp.UnmarshalTOML(n); err != nil {
return err
}
t.TimePrecision = &tp
}

if n, ok := d["time-interval"]; ok {
var ti duration
if err := ti.UnmarshalTOML(n); err != nil {
return err
}
t.TimeInterval = &ti
t.TimePrecision = nil
}

if t.TimePrecision == nil && t.TimeInterval == nil {
var tp precision
t.TimePrecision = &tp
}

// infer source
Expand Down
13 changes: 9 additions & 4 deletions pkg/data/gen/toml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ func visit(root *Schema) string {
fmt.Fprintln(w, " Fields:")

case *Field:
fmt.Fprintf(w, " %s: %s, count=%d, time-precision=%s\n", n.Name, n.Source, n.Count, n.TimePrecision)
if n.TimePrecision != nil {
fmt.Fprintf(w, " %s: %s, count=%d, time-precision=%s\n", n.Name, n.Source, n.Count, *n.TimePrecision)
} else {
fmt.Fprintf(w, " %s: %s, count=%d, time-interval=%s\n", n.Name, n.Source, n.Count, n.TimeInterval)
}

case *Tag:
fmt.Fprintf(w, " %s: %s\n", n.Name, n.Source)
Expand Down Expand Up @@ -78,6 +82,7 @@ series-limit = 10
name = "stringC"
count = 5000
source = "hello"
time-interval = "60s"
[[measurements.fields]]
name = "stringA"
Expand Down Expand Up @@ -123,7 +128,7 @@ name = "array"
name = "integerA"
count = 1000
source = [5, 6, 7]
time-precision = "us"
time-interval = "90s"
`
var out Schema
_, err := toml.Decode(in, &out)
Expand All @@ -140,7 +145,7 @@ name = "array"
Fields:
floatC: constant, source=0.5, count=5000, time-precision=Microsecond
integerC: constant, source=3, count=5000, time-precision=Hour
stringC: constant, source="hello", count=5000, time-precision=Millisecond
stringC: constant, source="hello", count=5000, time-interval=1m0s
stringA: array, source=[]string{"hello", "world"}, count=5000, time-precision=Millisecond
boolf: constant, source=false, count=5000, time-precision=Millisecond
Expand All @@ -156,7 +161,7 @@ name = "array"
tagFile: file, path=foo.txt
Fields:
stringA: array, source=[]string{"this", "that"}, count=1000, time-precision=Microsecond
integerA: array, source=[]int64{5, 6, 7}, count=1000, time-precision=Microsecond
integerA: array, source=[]int64{5, 6, 7}, count=1000, time-interval=1m30s
`
if got := visit(&out); !cmp.Equal(got, exp) {
t.Errorf("unexpected value, -got/+exp\n%s", cmp.Diff(got, exp))
Expand Down
26 changes: 0 additions & 26 deletions pkg/data/gen/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,6 @@ func sortDedupStrings(in []string) []string {
return in[:j+1]
}

func sortDedupInts(in []int) []int {
sort.Ints(in)
j := 0
for i := 1; i < len(in); i++ {
if in[j] == in[i] {
continue
}
j++
in[j] = in[i]
}
return in[:j+1]
}

func sortDedupFloats(in []float64) []float64 {
sort.Float64s(in)
j := 0
for i := 1; i < len(in); i++ {
if in[j] == in[i] {
continue
}
j++
in[j] = in[i]
}
return in[:j+1]
}

// ToInt64SliceE casts an interface to a []int64 type.
func toInt64SliceE(i interface{}) ([]int64, error) {
if i == nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/slices/bytes.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package slices

import "bytes"

// BytesToStrings converts a slice of []byte into a slice of strings.
func BytesToStrings(a [][]byte) []string {
s := make([]string, 0, len(a))
Expand Down Expand Up @@ -35,3 +37,28 @@ func CopyChunkedByteSlices(src [][]byte, chunkSize int) [][]byte {

return dst
}

// CompareSlice returns an integer comparing two slices of byte slices
// lexicographically.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func CompareSlice(a, b [][]byte) int {
i := 0
for i < len(a) && i < len(b) {
if v := bytes.Compare(a[i], b[i]); v == 0 {
i++
continue
} else {
return v
}
}

if i < len(b) {
// b is longer, so assume a is less
return -1
} else if i < len(a) {
// a is longer, so assume b is less
return 1
} else {
return 0
}
}
126 changes: 126 additions & 0 deletions pkg/slices/bytes_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package slices

import (
"bytes"
"fmt"
"math"
"reflect"
"testing"
"unsafe"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/pkg/bytesutil"
)

func TestCopyChunkedByteSlices_oneChunk(t *testing.T) {
Expand Down Expand Up @@ -76,3 +81,124 @@ func TestCopyChunkedByteSlices_multipleChunks(t *testing.T) {
t.Error("destination should not match source")
}
}

const NIL = "<nil>"

// ss returns a sorted slice of byte slices.
func ss(s ...string) [][]byte {
r := make([][]byte, len(s))
for i := range s {
if s[i] != NIL {
r[i] = []byte(s[i])
}
}
bytesutil.Sort(r)
return r
}

func TestCompareSlice(t *testing.T) {
name := func(a, b [][]byte, exp int) string {
var as string
if a != nil {
as = string(bytes.Join(a, nil))
} else {
as = NIL
}
var bs string
if b != nil {
bs = string(bytes.Join(b, nil))
} else {
bs = NIL
}
return fmt.Sprintf("%s <=> %s is %d", as, bs, exp)
}
tests := []struct {
a, b [][]byte
exp int
}{
{
a: ss("aaa", "bbb", "ccc"),
b: ss("aaa", "bbb", "ccc"),
exp: 0,
},

{
a: ss("aaa", "bbb", "ccc", "ddd"),
b: ss("aaa", "bbb", "ccc"),
exp: 1,
},

{
a: ss("aaa", "bbb"),
b: ss("aaa", "bbb", "ccc"),
exp: -1,
},

{
a: ss("aaa", "bbbb"),
b: ss("aaa", "bbb", "ccc"),
exp: 1,
},

{
a: ss("aaa", "ccc"),
b: ss("aaa", "bbb", "ccc"),
exp: 1,
},

{
a: ss("aaa", "bbb", NIL),
b: ss("aaa", "bbb", "ccc"),
exp: -1,
},

{
a: ss("aaa", NIL, "ccc"),
b: ss("aaa", NIL, "ccc"),
exp: 0,
},

{
a: ss(NIL, "bbb", "ccc"),
b: ss("aaa", "bbb", "ccc"),
exp: -1,
},

{
a: ss("aaa", "aaa"),
b: ss("aaa", "bbb", "ccc"),
exp: -1,
},

{
a: nil,
b: ss("aaa", "bbb", "ccc"),
exp: -1,
},

{
a: ss("aaa", "bbb"),
b: nil,
exp: 1,
},

{
a: nil,
b: nil,
exp: 0,
},

{
a: [][]byte{},
b: nil,
exp: 0,
},
}
for _, test := range tests {
t.Run(name(test.a, test.b, test.exp), func(t *testing.T) {
if got := CompareSlice(test.a, test.b); got != test.exp {
t.Errorf("unexpected result, -got/+exp\n%s", cmp.Diff(got, test.exp))
}
})
}
}
36 changes: 28 additions & 8 deletions prometheus/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/platform/storage/reads/datatypes"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)

const (
Expand All @@ -30,7 +30,18 @@ const (
measurementTagKey = "_measurement"
)

var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported")
// A DroppedValuesError is returned when the prometheus write request contains
// unsupported float64 values.
type DroppedValuesError struct {
nan uint64
ninf uint64
inf uint64
}

// Error returns a descriptive error of the values dropped.
func (e DroppedValuesError) Error() string {
return fmt.Sprintf("dropped unsupported Prometheus values: [NaN = %d, +Inf = %d, -Inf = %d]", e.nan, e.inf, e.ninf)
}

// WriteRequestToPoints converts a Prometheus remote write request of time series and their
// samples into Points that can be written into Influx
Expand All @@ -41,7 +52,8 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
}
points := make([]models.Point, 0, maxPoints)

var droppedNaN error
// Track any dropped values.
var nan, inf, ninf uint64

for _, ts := range req.Timeseries {
measurement := measurementName
Expand All @@ -55,9 +67,14 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
}

for _, s := range ts.Samples {
// skip NaN values, which are valid in Prometheus
if math.IsNaN(s.Value) {
droppedNaN = ErrNaNDropped
if v := s.Value; math.IsNaN(v) {
nan++
continue
} else if math.IsInf(v, -1) {
ninf++
continue
} else if math.IsInf(v, 1) {
inf++
continue
}

Expand All @@ -68,11 +85,14 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
if err != nil {
return nil, err
}

points = append(points, p)
}
}
return points, droppedNaN

if nan+inf+ninf > 0 {
return points, DroppedValuesError{nan: nan, inf: inf, ninf: ninf}
}
return points, nil
}

// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the
Expand Down
72 changes: 49 additions & 23 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/uuid"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
Expand All @@ -58,6 +58,12 @@ const (
MaxDebugRequestsInterval = 6 * time.Hour
)

var (
// ErrBearerAuthDisabled is returned when client specifies bearer auth in
// a request but bearer auth is disabled.
ErrBearerAuthDisabled = errors.New("bearer auth disabld")
)

// AuthenticationMethod defines the type of authentication used.
type AuthenticationMethod int

Expand Down Expand Up @@ -240,6 +246,10 @@ func (h *Handler) Open() {
}
h.accessLogFilters = StatusFilters(h.Config.AccessLogStatusFilters)

if h.Config.AuthEnabled && h.Config.SharedSecret == "" {
h.Logger.Info("Auth is enabled but shared-secret is blank. BearerAuthentication is disabled.")
}

if h.Config.FluxEnabled {
h.registered = true
prom.MustRegister(h.Controller.PrometheusCollectors()...)
Expand Down Expand Up @@ -985,7 +995,8 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
h.Logger.Info("Prom write handler", zap.Error(err))
}

if err != prometheus.ErrNaNDropped {
// Check if the error was from something other than dropping invalid values.
if _, ok := err.(prometheus.DroppedValuesError); !ok {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -1029,6 +1040,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
// servePromRead will convert a Prometheus remote read request into a storage
// query and returns data in Prometheus remote read protobuf format.
func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.PromReadRequests, 1)
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -1057,6 +1069,25 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
return
}

respond := func(resp *remote.ReadResponse) {
data, err := proto.Marshal(resp)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")

compressed = snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
}

ctx := context.Background()
rs, err := h.Store.Read(ctx, readRequest)
if err != nil {
Expand All @@ -1068,6 +1099,12 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
resp := &remote.ReadResponse{
Results: []*remote.QueryResult{{}},
}

if rs == nil {
respond(resp)
return
}

for rs.Next() {
cur := rs.Cursor()
if cur == nil {
Expand Down Expand Up @@ -1125,22 +1162,8 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
)
}
}
data, err := proto.Marshal(resp)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")

compressed = snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
respond(resp)
}

func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
Expand Down Expand Up @@ -1209,11 +1232,9 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
encoder := pr.Dialect.Encoder()
results := flux.NewResultIteratorFromQuery(q)
if h.Config.FluxLogEnabled {
if s, ok := results.(flux.Statisticser); ok {
defer func() {
stats = s.Statistics()
}()
}
defer func() {
stats = results.Statistics()
}()
}
defer results.Release()

Expand Down Expand Up @@ -1583,6 +1604,11 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h *
return
}
case BearerAuthentication:
if h.Config.SharedSecret == "" {
atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
h.httpError(w, ErrBearerAuthDisabled.Error(), http.StatusUnauthorized)
return
}
keyLookupFn := func(token *jwt.Token) (interface{}, error) {
// Check for expected signing method.
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
Expand Down
211 changes: 196 additions & 15 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,25 @@ func TestHandler_Query_Auth(t *testing.T) {
t.Fatalf("unexpected body: %s", body)
}

// Test that auth fails if shared secret is blank.
origSecret := h.Config.SharedSecret
h.Config.SharedSecret = ""
token, _ = MustJWTToken("user1", h.Config.SharedSecret, false)
signedToken, err = token.SignedString([]byte(h.Config.SharedSecret))
if err != nil {
t.Fatal(err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
w = httptest.NewRecorder()
h.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
//} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"bearer auth disabled"}` {
} else if !strings.Contains(w.Body.String(), httpd.ErrBearerAuthDisabled.Error()) {
t.Fatalf("unexpected body: %s", w.Body.String())
}
h.Config.SharedSecret = origSecret

// Test the handler with valid user and password in the url and invalid in
// basic auth (prioritize url).
w = httptest.NewRecorder()
Expand Down Expand Up @@ -539,8 +558,91 @@ func TestHandler_Query_CloseNotify(t *testing.T) {
}
}

// Ensure the prometheus remote write works
// Ensure the prometheus remote write works with valid values.
func TestHandler_PromWrite(t *testing.T) {
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
Labels: []*remote.LabelPair{
{Name: "host", Value: "a"},
{Name: "region", Value: "west"},
},
Samples: []*remote.Sample{
{TimestampMs: 1, Value: 1.2},
{TimestampMs: 3, Value: 14.5},
{TimestampMs: 6, Value: 222.99},
},
},
},
}

data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)

b := bytes.NewReader(compressed)
h := NewHandler(false)
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
return &meta.DatabaseInfo{}
}

var called bool
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true

if got, exp := len(points), 3; got != exp {
t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points)
}

expFields := []models.Fields{
models.Fields{"value": req.Timeseries[0].Samples[0].Value},
models.Fields{"value": req.Timeseries[0].Samples[1].Value},
models.Fields{"value": req.Timeseries[0].Samples[2].Value},
}

expTS := []int64{
req.Timeseries[0].Samples[0].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[1].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[2].TimestampMs * int64(time.Millisecond),
}

for i, point := range points {
if got, exp := point.UnixNano(), expTS[i]; got != exp {
t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point)
}

exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
if got := point.Tags(); !reflect.DeepEqual(got, exp) {
t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point)
}

gotFields, err := point.Fields()
if err != nil {
t.Fatal(err.Error())
}

if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point)
}
}
return nil
}

w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
if !called {
t.Fatal("WritePoints: expected call")
}

if w.Code != http.StatusNoContent {
t.Fatalf("unexpected status: %d", w.Code)
}
}

// Ensure the prometheus remote write works with invalid values.
func TestHandler_PromWrite_Dropped(t *testing.T) {
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
Expand All @@ -551,6 +653,13 @@ func TestHandler_PromWrite(t *testing.T) {
Samples: []*remote.Sample{
{TimestampMs: 1, Value: 1.2},
{TimestampMs: 2, Value: math.NaN()},
{TimestampMs: 3, Value: 14.5},
{TimestampMs: 4, Value: math.Inf(-1)},
{TimestampMs: 5, Value: math.Inf(1)},
{TimestampMs: 6, Value: 222.99},
{TimestampMs: 7, Value: math.Inf(-1)},
{TimestampMs: 8, Value: math.Inf(1)},
{TimestampMs: 9, Value: math.Inf(1)},
},
},
},
Expand All @@ -567,26 +676,45 @@ func TestHandler_PromWrite(t *testing.T) {
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
return &meta.DatabaseInfo{}
}
called := false

var called bool
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true
point := points[0]
if point.UnixNano() != int64(time.Millisecond) {
t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano())

if got, exp := len(points), 3; got != exp {
t.Fatalf("got %d points, expected %d\n\npoints:\n%v", got, exp, points)
}
tags := point.Tags()
expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
if !reflect.DeepEqual(tags, expectedTags) {
t.Fatalf("tags don't match\n\texp: %v\n\tgot: %v", expectedTags, tags)

expFields := []models.Fields{
models.Fields{"value": req.Timeseries[0].Samples[0].Value},
models.Fields{"value": req.Timeseries[0].Samples[2].Value},
models.Fields{"value": req.Timeseries[0].Samples[5].Value},
}

fields, err := point.Fields()
if err != nil {
t.Fatal(err.Error())
expTS := []int64{
req.Timeseries[0].Samples[0].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[2].TimestampMs * int64(time.Millisecond),
req.Timeseries[0].Samples[5].TimestampMs * int64(time.Millisecond),
}
expFields := models.Fields{"value": 1.2}
if !reflect.DeepEqual(fields, expFields) {
t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields)

for i, point := range points {
if got, exp := point.UnixNano(), expTS[i]; got != exp {
t.Fatalf("got time %d, expected %d\npoint:\n%v", got, exp, point)
}

exp := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
if got := point.Tags(); !reflect.DeepEqual(got, exp) {
t.Fatalf("got tags: %v, expected: %v\npoint:\n%v", got, exp, point)
}

gotFields, err := point.Fields()
if err != nil {
t.Fatal(err.Error())
}

if got, exp := gotFields, expFields[i]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got fields %v, expected %v\npoint:\n%v", got, exp, point)
}
}
return nil
}
Expand All @@ -596,11 +724,64 @@ func TestHandler_PromWrite(t *testing.T) {
if !called {
t.Fatal("WritePoints: expected call")
}

if w.Code != http.StatusNoContent {
t.Fatalf("unexpected status: %d", w.Code)
}
}

func mustMakeBigString(sz int) string {
a := make([]byte, 0, sz)
for i := 0; i < cap(a); i++ {
a = append(a, 'a')
}
return string(a)
}

func TestHandler_PromWrite_Error(t *testing.T) {
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
// Invalid tag key
Labels: []*remote.LabelPair{{Name: mustMakeBigString(models.MaxKeyLength), Value: "a"}},
Samples: []*remote.Sample{{TimestampMs: 1, Value: 1.2}},
},
},
}

data, err := proto.Marshal(req)
if err != nil {
t.Fatal("couldn't marshal prometheus request")
}
compressed := snappy.Encode(nil, data)

b := bytes.NewReader(compressed)
h := NewHandler(false)
h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
return &meta.DatabaseInfo{}
}

var called bool
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true
return nil
}

w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
if w.Code != http.StatusBadRequest {
t.Fatalf("unexpected status: %d", w.Code)
}

if got, exp := strings.TrimSpace(w.Body.String()), `{"error":"max key length exceeded: 65572 \u003e 65535"}`; got != exp {
t.Fatalf("got error %q, expected %q", got, exp)
}

if called {
t.Fatal("WritePoints called but should not be")
}
}

// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
// data is returned
func TestHandler_PromRead(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions services/storage/predicate_influxql.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package storage

import "github.com/influxdata/influxql"
import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
)

var measurementRemap = map[string]string{
"_measurement": "_name",
"_m": "_name",
"_f": "_field",
"_measurement": "_name",
models.MeasurementTagKey: "_name",
models.FieldKeyTagKey: "_field",
}

func RewriteExprRemoveFieldKeyAndValue(expr influxql.Expr) influxql.Expr {
Expand Down
4 changes: 2 additions & 2 deletions services/storage/predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
)

func TestHasSingleMeasurementNoOR(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions services/storage/series_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
opentracing "github.com/opentracing/opentracing-go"
)

Expand Down
2 changes: 1 addition & 1 deletion services/storage/series_cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
)

func exprEqual(x, y influxql.Expr) bool {
Expand Down
8 changes: 4 additions & 4 deletions services/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/platform/query/functions/inputs/storage"
"github.com/influxdata/platform/storage/reads"
"github.com/influxdata/platform/storage/reads/datatypes"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -211,6 +211,6 @@ func (s *Store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (read
return rs, nil
}

func (s *Store) GetSource(rs storage.ReadSpec) (proto.Message, error) {
func (s *Store) GetSource(rs influxdb.ReadSpec) (proto.Message, error) {
return &ReadSource{Database: rs.Database, RetentionPolicy: rs.RetentionPolicy}, nil
}
1,085 changes: 1,085 additions & 0 deletions storage/reads/array_cursor.gen.go

Large diffs are not rendered by default.

163 changes: 163 additions & 0 deletions storage/reads/array_cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package reads

import (
"context"
"fmt"

"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)

type singleValue struct {
v interface{}
}

func (v *singleValue) Value(key string) (interface{}, bool) {
return v.v, true
}

func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
if cursor == nil {
return nil
}

switch agg.Type {
case datatypes.AggregateTypeSum:
return newSumArrayCursor(cursor)
case datatypes.AggregateTypeCount:
return newCountArrayCursor(cursor)
default:
// TODO(sgc): should be validated higher up
panic("invalid aggregate")
}
}

func newSumArrayCursor(cur cursors.Cursor) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return newFloatArraySumCursor(cur)
case cursors.IntegerArrayCursor:
return newIntegerArraySumCursor(cur)
case cursors.UnsignedArrayCursor:
return newUnsignedArraySumCursor(cur)
default:
// TODO(sgc): propagate an error instead?
return nil
}
}

func newCountArrayCursor(cur cursors.Cursor) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return &integerFloatCountArrayCursor{FloatArrayCursor: cur}
case cursors.IntegerArrayCursor:
return &integerIntegerCountArrayCursor{IntegerArrayCursor: cur}
case cursors.UnsignedArrayCursor:
return &integerUnsignedCountArrayCursor{UnsignedArrayCursor: cur}
case cursors.StringArrayCursor:
return &integerStringCountArrayCursor{StringArrayCursor: cur}
case cursors.BooleanArrayCursor:
return &integerBooleanCountArrayCursor{BooleanArrayCursor: cur}
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}

type cursorContext struct {
ctx context.Context
req *cursors.CursorRequest
itrs cursors.CursorIterators
limit int64
count int64
err error
}

type multiShardArrayCursors struct {
ctx context.Context
limit int64
req cursors.CursorRequest

cursors struct {
i integerMultiShardArrayCursor
f floatMultiShardArrayCursor
u unsignedMultiShardArrayCursor
b booleanMultiShardArrayCursor
s stringMultiShardArrayCursor
}
}

func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, limit int64) *multiShardArrayCursors {
if limit < 0 {
limit = 1
}

m := &multiShardArrayCursors{
ctx: ctx,
limit: limit,
req: cursors.CursorRequest{
Ascending: asc,
StartTime: start,
EndTime: end,
},
}

cc := cursorContext{
ctx: ctx,
limit: limit,
req: &m.req,
}

m.cursors.i.cursorContext = cc
m.cursors.f.cursorContext = cc
m.cursors.u.cursorContext = cc
m.cursors.b.cursorContext = cc
m.cursors.s.cursorContext = cc

return m
}

func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
m.req.Name = row.Name
m.req.Tags = row.SeriesTags
m.req.Field = row.Field

var cond expression
if row.ValueCond != nil {
cond = &astExpr{row.ValueCond}
}

var shard cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(row.Query) > 0 {
shard, row.Query = row.Query[0], row.Query[1:]
cur, _ = shard.Next(m.ctx, &m.req)
}

if cur == nil {
return nil
}

switch c := cur.(type) {
case cursors.IntegerArrayCursor:
m.cursors.i.reset(c, row.Query, cond)
return &m.cursors.i
case cursors.FloatArrayCursor:
m.cursors.f.reset(c, row.Query, cond)
return &m.cursors.f
case cursors.UnsignedArrayCursor:
m.cursors.u.reset(c, row.Query, cond)
return &m.cursors.u
case cursors.StringArrayCursor:
m.cursors.s.reset(c, row.Query, cond)
return &m.cursors.s
case cursors.BooleanArrayCursor:
m.cursors.b.reset(c, row.Query, cond)
return &m.cursors.b
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}

func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
return newAggregateArrayCursor(ctx, agg, cursor)
}
30 changes: 30 additions & 0 deletions storage/reads/datatypes/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# List any generated files here
TARGETS = predicate.pb.go \
storage_common.pb.go

# List any source files used to generate the targets here
SOURCES = gen.go \
predicate.proto \
storage_common.proto

# List any directories that have their own Makefile here
SUBDIRS =

# Default target
all: $(SUBDIRS) $(TARGETS)

# Recurse into subdirs for same make goal
$(SUBDIRS):
$(MAKE) -C $@ $(MAKECMDGOALS)

# Clean all targets recursively
clean: $(SUBDIRS)
rm -f $(TARGETS)

# Define go generate if not already defined
GO_GENERATE := go generate

$(TARGETS): $(SOURCES)
$(GO_GENERATE) -x

.PHONY: all clean $(SUBDIRS)
55 changes: 55 additions & 0 deletions storage/reads/datatypes/hintflags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package datatypes

import (
"strings"

"github.com/gogo/protobuf/proto"
)

type HintFlags uint32

func (h HintFlags) NoPoints() bool {
return uint32(h)&uint32(HintNoPoints) != 0
}

func (h *HintFlags) SetNoPoints() {
*h |= HintFlags(HintNoPoints)
}

func (h HintFlags) NoSeries() bool {
return uint32(h)&uint32(HintNoSeries) != 0
}

func (h *HintFlags) SetNoSeries() {
*h |= HintFlags(HintNoSeries)
}

func (h HintFlags) HintSchemaAllTime() bool {
return uint32(h)&uint32(HintSchemaAllTime) != 0
}

func (h *HintFlags) SetHintSchemaAllTime() {
*h |= HintFlags(HintSchemaAllTime)
}

func (h HintFlags) String() string {
f := uint32(h)

var s []string
enums := proto.EnumValueMap("influxdata.platform.storage.ReadRequest_HintFlags")
if h == 0 {
return "HINT_NONE"
}

for k, v := range enums {
if v == 0 {
continue
}
v := uint32(v)
if f&v == v {
s = append(s, k)
}
}

return strings.Join(s, ",")
}
Loading