From b60fad0cf3cafba7e2b595ae8a43c648242bad1f Mon Sep 17 00:00:00 2001 From: Sam Arnold Date: Wed, 31 Mar 2021 15:35:02 -0400 Subject: [PATCH] refactor: move FluxLanguageService interface to fluxlang --- check.go | 5 +++-- http/api_handler.go | 3 ++- http/check_service.go | 7 ++++--- http/query.go | 5 +++-- http/query_handler.go | 5 +++-- kv/service.go | 5 +++-- notification/check/check.go | 3 ++- notification/check/custom.go | 11 ++++++----- notification/check/deadman.go | 5 +++-- notification/check/threshold.go | 7 ++++--- query.go | 35 --------------------------------- query/fluxlang/service.go | 24 ++++++++++++++++++++-- query/service.go | 6 +++--- task.go | 7 ++++--- task/backend/executor/limits.go | 3 ++- 15 files changed, 64 insertions(+), 67 deletions(-) delete mode 100644 query.go diff --git a/check.go b/check.go index 134e28e0077..e8ecaf4843b 100644 --- a/check.go +++ b/check.go @@ -3,6 +3,7 @@ package influxdb import ( "context" "encoding/json" + "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -16,14 +17,14 @@ const ( // Check represents the information required to generate a periodic check task. type Check interface { - Valid(lang FluxLanguageService) error + Valid(lang fluxlang.FluxLanguageService) error Type() string ClearPrivateData() SetTaskID(platform.ID) GetTaskID() platform.ID GetOwnerID() platform.ID SetOwnerID(platform.ID) - GenerateFlux(lang FluxLanguageService) (string, error) + GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) json.Marshaler CRUDLogSetter diff --git a/http/api_handler.go b/http/api_handler.go index f493a173951..d3eba0291ec 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -2,6 +2,7 @@ package http import ( "context" + "github.com/influxdata/influxdb/v2/query/fluxlang" "net/http" "github.com/influxdata/influxdb/v2/kit/platform" @@ -85,7 +86,7 @@ type APIBackend struct { InfluxQLService query.ProxyQueryService InfluxqldService influxql.ProxyQueryService FluxService query.ProxyQueryService - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService TaskService influxdb.TaskService CheckService influxdb.CheckService TelegrafService influxdb.TelegrafConfigStore diff --git a/http/check_service.go b/http/check_service.go index be9ccffafd7..45553226f58 100644 --- a/http/check_service.go +++ b/http/check_service.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "io/ioutil" "net/http" "path" @@ -34,7 +35,7 @@ type CheckBackend struct { LabelService influxdb.LabelService UserService influxdb.UserService OrganizationService influxdb.OrganizationService - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService } // NewCheckBackend returns a new instance of CheckBackend. @@ -65,7 +66,7 @@ type CheckHandler struct { LabelService influxdb.LabelService UserService influxdb.UserService OrganizationService influxdb.OrganizationService - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService } const ( @@ -441,7 +442,7 @@ func decodePostCheckRequest(r *http.Request) (postCheckRequest, error) { }, nil } -func decodePutCheckRequest(ctx context.Context, lang influxdb.FluxLanguageService, r *http.Request) (influxdb.CheckCreate, error) { +func decodePutCheckRequest(ctx context.Context, lang fluxlang.FluxLanguageService, r *http.Request) (influxdb.CheckCreate, error) { params := httprouter.ParamsFromContext(ctx) id := params.ByName("id") if id == "" { diff --git a/http/query.go b/http/query.go index 466c6960b64..a4a7b9a798f 100644 --- a/http/query.go +++ b/http/query.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "io" "io/ioutil" "mime" @@ -144,7 +145,7 @@ type queryParseError struct { // Analyze attempts to parse the query request and returns any errors // encountered in a structured way. -func (r QueryRequest) Analyze(l influxdb.FluxLanguageService) (*QueryAnalysis, error) { +func (r QueryRequest) Analyze(l fluxlang.FluxLanguageService) (*QueryAnalysis, error) { switch r.Type { case "flux": return r.analyzeFluxQuery(l) @@ -155,7 +156,7 @@ func (r QueryRequest) Analyze(l influxdb.FluxLanguageService) (*QueryAnalysis, e return nil, fmt.Errorf("unknown query request type %s", r.Type) } -func (r QueryRequest) analyzeFluxQuery(l influxdb.FluxLanguageService) (*QueryAnalysis, error) { +func (r QueryRequest) analyzeFluxQuery(l fluxlang.FluxLanguageService) (*QueryAnalysis, error) { a := &QueryAnalysis{} pkg, err := query.Parse(l, r.Query) if pkg == nil { diff --git a/http/query_handler.go b/http/query_handler.go index 537c826e3ca..2fec1834f44 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "io" "io/ioutil" "net/http" @@ -51,7 +52,7 @@ type FluxBackend struct { AlgoWProxy FeatureProxyHandler OrganizationService influxdb.OrganizationService ProxyQueryService query.ProxyQueryService - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService Flagger feature.Flagger } @@ -86,7 +87,7 @@ type FluxHandler struct { Now func() time.Time OrganizationService influxdb.OrganizationService ProxyQueryService query.ProxyQueryService - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService EventRecorder metric.EventRecorder diff --git a/kv/service.go b/kv/service.go index 7ad86dfb77f..33d2335487b 100644 --- a/kv/service.go +++ b/kv/service.go @@ -4,6 +4,7 @@ import ( "github.com/benbjohnson/clock" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/rand" "github.com/influxdata/influxdb/v2/resource" "github.com/influxdata/influxdb/v2/resource/noop" @@ -26,7 +27,7 @@ type Service struct { // FluxLanguageService is used for parsing flux. // If this is unset, operations that require parsing flux // will fail. - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService TokenGenerator influxdb.TokenGenerator // TODO(desa:ariel): this should not be embedded @@ -67,7 +68,7 @@ func NewService(log *zap.Logger, kv Store, orgs influxdb.OrganizationService, co // ServiceConfig allows us to configure Services type ServiceConfig struct { Clock clock.Clock - FluxLanguageService influxdb.FluxLanguageService + FluxLanguageService fluxlang.FluxLanguageService } // WithResourceLogger sets the resource audit logger for the service. diff --git a/notification/check/check.go b/notification/check/check.go index 5cabe36b2cc..1e5b40f5a90 100644 --- a/notification/check/check.go +++ b/notification/check/check.go @@ -3,6 +3,7 @@ package check import ( "encoding/json" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -39,7 +40,7 @@ type Base struct { } // Valid returns err if the check is invalid. -func (b Base) Valid(lang influxdb.FluxLanguageService) error { +func (b Base) Valid(lang fluxlang.FluxLanguageService) error { if !b.ID.Valid() { return &errors.Error{ Code: errors.EInvalid, diff --git a/notification/check/custom.go b/notification/check/custom.go index 8b929667814..27285fc0381 100644 --- a/notification/check/custom.go +++ b/notification/check/custom.go @@ -2,6 +2,7 @@ package check import ( "encoding/json" + "github.com/influxdata/influxdb/v2/query/fluxlang" "time" "github.com/influxdata/influxdb/v2/kit/platform" @@ -64,12 +65,12 @@ type Custom struct { // |> monitor.check(data: check, messageFn:messageFn, warn:warn, crit:crit, info:info) // GenerateFlux returns the check query text directly -func (c Custom) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) { +func (c Custom) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) { return c.Query.Text, nil } // sanitizeFlux modifies the check query text to include correct _check_id param in check object -func (c Custom) sanitizeFlux(lang influxdb.FluxLanguageService) (string, error) { +func (c Custom) sanitizeFlux(lang fluxlang.FluxLanguageService) (string, error) { p, err := query.Parse(lang, c.Query.Text) if p == nil { return "", err @@ -106,7 +107,7 @@ func propertyHasValue(prop *ast.Property, key string, value string) bool { return ok && prop.Key.Key() == key && stringLit.Value == value } -func (c *Custom) hasRequiredTaskOptions(lang influxdb.FluxLanguageService) (err error) { +func (c *Custom) hasRequiredTaskOptions(lang fluxlang.FluxLanguageService) (err error) { p, err := query.Parse(lang, c.Query.Text) if p == nil { @@ -175,7 +176,7 @@ func (c *Custom) hasRequiredTaskOptions(lang influxdb.FluxLanguageService) (err return nil } -func (c *Custom) hasRequiredCheckParameters(lang influxdb.FluxLanguageService) (err error) { +func (c *Custom) hasRequiredCheckParameters(lang fluxlang.FluxLanguageService) (err error) { p, err := query.Parse(lang, c.Query.Text) if p == nil { return err @@ -223,7 +224,7 @@ func (c *Custom) hasRequiredCheckParameters(lang influxdb.FluxLanguageService) ( } // Valid checks whether check flux is valid, returns error if invalid -func (c *Custom) Valid(lang influxdb.FluxLanguageService) error { +func (c *Custom) Valid(lang fluxlang.FluxLanguageService) error { if err := c.hasRequiredCheckParameters(lang); err != nil { return err diff --git a/notification/check/deadman.go b/notification/check/deadman.go index 29910e37d4e..ac411c53684 100644 --- a/notification/check/deadman.go +++ b/notification/check/deadman.go @@ -3,6 +3,7 @@ package check import ( "encoding/json" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "strings" "github.com/influxdata/flux/ast" @@ -31,7 +32,7 @@ func (c Deadman) Type() string { } // GenerateFlux returns a flux script for the Deadman provided. -func (c Deadman) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) { +func (c Deadman) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) { p, err := c.GenerateFluxAST(lang) if err != nil { return "", err @@ -43,7 +44,7 @@ func (c Deadman) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) // GenerateFluxAST returns a flux AST for the deadman provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (c Deadman) GenerateFluxAST(lang influxdb.FluxLanguageService) (*ast.Package, error) { +func (c Deadman) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Package, error) { p, err := query.Parse(lang, c.Query.Text) if p == nil { return nil, err diff --git a/notification/check/threshold.go b/notification/check/threshold.go index 5d1f57146d1..7c545fcaec4 100644 --- a/notification/check/threshold.go +++ b/notification/check/threshold.go @@ -3,6 +3,7 @@ package check import ( "encoding/json" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "strings" "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -28,7 +29,7 @@ func (t Threshold) Type() string { } // Valid returns error if something is invalid. -func (t Threshold) Valid(lang influxdb.FluxLanguageService) error { +func (t Threshold) Valid(lang fluxlang.FluxLanguageService) error { if err := t.Base.Valid(lang); err != nil { return err } @@ -106,7 +107,7 @@ func multiError(errs []error) error { // GenerateFlux returns a flux script for the threshold provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (t Threshold) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) { +func (t Threshold) GenerateFlux(lang fluxlang.FluxLanguageService) (string, error) { p, err := t.GenerateFluxAST(lang) if err != nil { return "", err @@ -118,7 +119,7 @@ func (t Threshold) GenerateFlux(lang influxdb.FluxLanguageService) (string, erro // GenerateFluxAST returns a flux AST for the threshold provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (t Threshold) GenerateFluxAST(lang influxdb.FluxLanguageService) (*ast.Package, error) { +func (t Threshold) GenerateFluxAST(lang fluxlang.FluxLanguageService) (*ast.Package, error) { p, err := query.Parse(lang, t.Query.Text) if p == nil { return nil, err diff --git a/query.go b/query.go deleted file mode 100644 index 8c8ae73a431..00000000000 --- a/query.go +++ /dev/null @@ -1,35 +0,0 @@ -package influxdb - -import ( - "context" - - "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/complete" - "github.com/influxdata/flux/interpreter" - "github.com/influxdata/flux/values" -) - -// TODO(desa): These files are possibly a temporary. This is needed -// as a part of the source work that is being done. -// See https://github.com/influxdata/platform/issues/594 for more info. - -// SourceQuery is a query for a source. -type SourceQuery struct { - Query string `json:"query"` - Type string `json:"type"` -} - -// FluxLanguageService is a service for interacting with flux code. -type FluxLanguageService interface { - // Parse will take flux source code and produce a package. - // If there are errors when parsing, the first error is returned. - // An ast.Package may be returned when a parsing error occurs, - // but it may be null if parsing didn't even occur. - Parse(source string) (*ast.Package, error) - - // EvalAST will evaluate and run an AST. - EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) - - // Completer will return a flux completer. - Completer() complete.Completer -} diff --git a/query/fluxlang/service.go b/query/fluxlang/service.go index 94a8a8f4c4c..44d7ca53dfa 100644 --- a/query/fluxlang/service.go +++ b/query/fluxlang/service.go @@ -10,11 +10,31 @@ import ( "github.com/influxdata/flux/parser" "github.com/influxdata/flux/runtime" "github.com/influxdata/flux/values" - "github.com/influxdata/influxdb/v2" ) +// SourceQuery is a query for a source. +type SourceQuery struct { + Query string `json:"query"` + Type string `json:"type"` +} + +// FluxLanguageService is a service for interacting with flux code. +type FluxLanguageService interface { + // Parse will take flux source code and produce a package. + // If there are errors when parsing, the first error is returned. + // An ast.Package may be returned when a parsing error occurs, + // but it may be null if parsing didn't even occur. + Parse(source string) (*ast.Package, error) + + // EvalAST will evaluate and run an AST. + EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) + + // Completer will return a flux completer. + Completer() complete.Completer +} + // DefaultService is the default language service. -var DefaultService influxdb.FluxLanguageService = defaultService{} +var DefaultService FluxLanguageService = defaultService{} type defaultService struct{} diff --git a/query/service.go b/query/service.go index 950a5fbada1..fb552f0ceff 100644 --- a/query/service.go +++ b/query/service.go @@ -2,6 +2,7 @@ package query import ( "context" + "github.com/influxdata/influxdb/v2/query/fluxlang" "io" "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -10,7 +11,6 @@ import ( "github.com/influxdata/flux/ast" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/values" - "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/check" ) @@ -46,7 +46,7 @@ type ProxyQueryService interface { // but it may be null if parsing didn't even occur. // // This will return an error if the FluxLanguageService is nil. -func Parse(lang influxdb.FluxLanguageService, source string) (*ast.Package, error) { +func Parse(lang fluxlang.FluxLanguageService, source string) (*ast.Package, error) { if lang == nil { return nil, &errors.Error{ Code: errors.EInternal, @@ -59,7 +59,7 @@ func Parse(lang influxdb.FluxLanguageService, source string) (*ast.Package, erro // EvalAST will evaluate and run an AST. // // This will return an error if the FluxLanguageService is nil. -func EvalAST(ctx context.Context, lang influxdb.FluxLanguageService, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) { +func EvalAST(ctx context.Context, lang fluxlang.FluxLanguageService, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) { if lang == nil { return nil, nil, &errors.Error{ Code: errors.EInternal, diff --git a/task.go b/task.go index 78c01978ca7..a7358c9e7ac 100644 --- a/task.go +++ b/task.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/influxdata/influxdb/v2/query/fluxlang" "strconv" "time" @@ -276,7 +277,7 @@ func (t *TaskUpdate) Validate() error { // safeParseSource calls the Flux parser.ParseSource function // and is guaranteed not to panic. -func safeParseSource(parser FluxLanguageService, f string) (pkg *ast.Package, err error) { +func safeParseSource(parser fluxlang.FluxLanguageService, f string) (pkg *ast.Package, err error) { if parser == nil { return nil, &errors2.Error{ Code: errors2.EInternal, @@ -298,11 +299,11 @@ func safeParseSource(parser FluxLanguageService, f string) (pkg *ast.Package, er // UpdateFlux updates the TaskUpdate to go from updating options to updating a // flux string, that now has those updated options in it. It zeros the options // in the TaskUpdate. -func (t *TaskUpdate) UpdateFlux(parser FluxLanguageService, oldFlux string) error { +func (t *TaskUpdate) UpdateFlux(parser fluxlang.FluxLanguageService, oldFlux string) error { return t.updateFlux(parser, oldFlux) } -func (t *TaskUpdate) updateFlux(parser FluxLanguageService, oldFlux string) error { +func (t *TaskUpdate) updateFlux(parser fluxlang.FluxLanguageService, oldFlux string) error { if t.Flux != nil && *t.Flux != "" { oldFlux = *t.Flux } diff --git a/task/backend/executor/limits.go b/task/backend/executor/limits.go index d564039add4..802263bc017 100644 --- a/task/backend/executor/limits.go +++ b/task/backend/executor/limits.go @@ -2,6 +2,7 @@ package executor import ( "context" + "github.com/influxdata/influxdb/v2/query/fluxlang" "sort" "github.com/influxdata/influxdb/v2" @@ -10,7 +11,7 @@ import ( // ConcurrencyLimit creates a concurrency limit func that uses the executor to determine // if the task has exceeded the concurrency limit. -func ConcurrencyLimit(exec *Executor, lang influxdb.FluxLanguageService) LimitFunc { +func ConcurrencyLimit(exec *Executor, lang fluxlang.FluxLanguageService) LimitFunc { return func(t *influxdb.Task, r *influxdb.Run) error { o, err := options.FromScriptAST(lang, t.Flux) if err != nil {