Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding flux query parameters #308

Merged
merged 2 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
mkdir -p /tmp/artifacts
mkdir -p /tmp/test-results
- run: sudo rm -rf /usr/local/go
- run: wget https://golang.org/dl/go1.13.14.linux-amd64.tar.gz -O /tmp/go.tgz
- run: wget https://golang.org/dl/go1.17.7.linux-amd64.tar.gz -O /tmp/go.tgz
- run: sudo tar -C /usr/local -xzf /tmp/go.tgz
- run: go version
- run: go get -v -t -d ./...
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
### Features
- [#304](https://github.com/influxdata/influxdb-client-go/pull/304) Added public constructor for `QueryTableResult`
- [#307](https://github.com/influxdata/influxdb-client-go/pull/307) Synced generated server API with the latest [oss.yml](https://github.com/influxdata/openapi/blob/master/contracts/oss.yml).
- [#308](https://github.com/influxdata/influxdb-client-go/pull/308) Added Flux query parameters. Supported by InfluxDB Cloud only now.
- [#308](https://github.com/influxdata/influxdb-client-go/pull/308) Go 1.17 is required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it depend on 1.17 features? Isn't it the case that 1.17 is now used by tests, so it is not a feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should IMHO at least follow Go Security Policy so that each major Go release is supported until there are two newer major releases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go 1.17 is required because the new feature uses https://pkg.go.dev/reflect#VisibleFields, which is the new addon from Roger Peppe released in 1.17. It is out almost 6 months.
Go V3, where this feature was added originally, already uses 1.17 (although it is still in progress, it was supposed to be ready soon). Other products in InfluxData also require Go 1.17 (although not libraries).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation. 1.18 is likely to come this month and 1.16 will thus go out of support. So it makes sense to require 1.17 herein.


## 2.7.0[2022-01-20]
### Features
Expand Down
74 changes: 73 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This repository contains the reference Go client for InfluxDB 2.
- [Basic Example](#basic-example)
- [Writes in Detail](#writes)
- [Queries in Detail](#queries)
- [Parametrized Queries](#parametrized-queries)
- [Concurrency](#concurrency)
- [Proxy and redirects](#proxy-and-redirects)
- [Checking Server State](#checking-server-state)
Expand Down Expand Up @@ -61,7 +62,7 @@ There are also other examples in the API docs:
## How To Use

### Installation
**Go 1.13** or later is required.
**Go 1.17** or later is required.

1. Add the client package your to your project dependencies (go.mod).
```sh
Expand Down Expand Up @@ -412,6 +413,77 @@ func main() {
client.Close()
}
```
### Parametrized Queries
InfluxDB Cloud supports [Parameterized Queries](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/)
that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more
reusable and can also be used to help prevent injection attacks.

InfluxDB Cloud inserts the params object into the Flux query as a Flux record named `params`. Use dot or bracket
notation to access parameters in the `params` record in your Flux query. Parameterized Flux queries support only `int`
, `float`, and `string` data types. To convert the supported data types into
other [Flux basic data types, use Flux type conversion functions](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/#supported-parameter-data-types).

Query parameters can be passed as a struct or map. Param values can be only simple types or `time.Time`.
The name of the parameter represented by a struct field can be specified by JSON annotation.

Parameterized query example:
> :warning: Parameterized Queries are supported only in InfluxDB Cloud. There is no support in InfluxDB OSS currently.
```go
package main

import (
"context"
"fmt"

"github.com/influxdata/influxdb-client-go/v2"
)

func main() {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")
// Get query client
queryAPI := client.QueryAPI("my-org")
// Define parameters
parameters := struct {
Start string `json:"start"`
Field string `json:"field"`
Value float64 `json:"value"`
}{
"-1h",
"temperature",
25,
}
// Query with parameters
query := `from(bucket:"my-bucket")
|> range(start: duration(params.start))
|> filter(fn: (r) => r._measurement == "stat")
|> filter(fn: (r) => r._field == params.field)
|> filter(fn: (r) => r._value > params.value)`

// Get result
result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// Access data
fmt.Printf("value: %v\n", result.Record().Value())
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
// Ensures background processes finishes
client.Close()
}
```

### Concurrency
InfluxDB Go Client can be used in a concurrent environment. All its functions are thread-safe.

Expand Down
45 changes: 45 additions & 0 deletions api/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,51 @@ func ExampleQueryAPI_query() {
client.Close()
}

func ExampleQueryAPI_queryWithParams() {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")
// Get query client
queryAPI := client.QueryAPI("my-org")
// Define parameters
parameters := struct {
Start string `json:"start"`
Field string `json:"field"`
Value float64 `json:"value"`
}{
"-1h",
"temperature",
25,
}
// Query with parameters
query := `from(bucket:"my-bucket")
|> range(start: duration(params.start))
|> filter(fn: (r) => r._measurement == "stat")
|> filter(fn: (r) => r._field == params.field)
|> filter(fn: (r) => r._value > params.value)`

// Get result
result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// Access data
fmt.Printf("value: %v\n", result.Record().Value())
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
// Ensures background processes finishes
client.Close()
}

func ExampleQueryAPI_queryRaw() {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")
Expand Down
163 changes: 142 additions & 21 deletions api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/http"
"net/url"
"path"
"reflect"
"strconv"
"strings"
"sync"
Expand All @@ -43,11 +44,35 @@ const (
)

// QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
//
// Flux query can contain reference to parameters, which must be passed via queryParams.
// it can be a struct or map. Param values can be only simple types or time.Time.
// The name of a struct field or a map key (must be a string) will be a param name.
// The name of the parameter represented by a struct field can be specified by JSON annotation:
//
// type Condition struct {
// Start time.Time `json:"start"`
// Field string `json:"field"`
// Value float64 `json:"value"`
// }
//
// Parameters are then accessed via the Flux params object:
//
// query:= `from(bucket: "environment")
// |> range(start: time(v: params.start))
// |> filter(fn: (r) => r._measurement == "air")
// |> filter(fn: (r) => r._field == params.field)
// |> filter(fn: (r) => r._value > params.value)`
//
type QueryAPI interface {
// QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
// QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error)
// Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
Query(ctx context.Context, query string) (*QueryTableResult, error)
// QueryWithParams executes flux parametrized query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error)
}

// NewQueryAPI returns new query client for querying buckets belonging to org
Expand All @@ -58,6 +83,28 @@ func NewQueryAPI(org string, service http2.Service) QueryAPI {
}
}

// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
tablePosition int
tableChanged bool
table *query.FluxTableMetadata
record *query.FluxRecord
err error
}

// NewQueryTableResult returns new QueryTableResult
func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
csvReader := csv.NewReader(rawResponse)
csvReader.FieldsPerRecord = -1
return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
}

// queryAPI implements QueryAPI interface
type queryAPI struct {
org string
Expand All @@ -66,13 +113,32 @@ type queryAPI struct {
lock sync.Mutex
}

// queryBody holds the body for an HTTP query request.
type queryBody struct {
Dialect *domain.Dialect `json:"dialect,omitempty"`
Query string `json:"query"`
Type domain.QueryType `json:"type"`
Params interface{} `json:"params,omitempty"`
}

func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) {
return q.QueryRawWithParams(ctx, query, dialect, nil)
}

func (q *queryAPI) QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error) {
if err := checkParamsType(params); err != nil {
return "", err
}
queryURL, err := q.queryURL()
if err != nil {
return "", err
}
queryType := domain.QueryTypeFlux
qr := domain.Query{Query: query, Type: &queryType, Dialect: dialect}
qr := queryBody{
Query: query,
Type: domain.QueryTypeFlux,
Dialect: dialect,
Params: params,
}
qrJSON, err := json.Marshal(qr)
if err != nil {
return "", err
Expand Down Expand Up @@ -118,13 +184,24 @@ func DefaultDialect() *domain.Dialect {
}

func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) {
return q.QueryWithParams(ctx, query, nil)
}

func (q *queryAPI) QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error) {
var queryResult *QueryTableResult
if err := checkParamsType(params); err != nil {
return nil, err
}
queryURL, err := q.queryURL()
if err != nil {
return nil, err
}
queryType := domain.QueryTypeFlux
qr := domain.Query{Query: query, Type: &queryType, Dialect: DefaultDialect()}
qr := queryBody{
Query: query,
Type: domain.QueryTypeFlux,
Dialect: DefaultDialect(),
Params: params,
}
qrJSON, err := json.Marshal(qr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,25 +249,69 @@ func (q *queryAPI) queryURL() (string, error) {
return q.url, nil
}

// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
tablePosition int
tableChanged bool
table *query.FluxTableMetadata
record *query.FluxRecord
err error
// checkParamsType validates the value is struct with simple type fields
// or a map with key as string and value as a simple type
func checkParamsType(p interface{}) error {
if p == nil {
return nil
}
t := reflect.TypeOf(p)
v := reflect.ValueOf(p)
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
if t.Kind() != reflect.Struct && t.Kind() != reflect.Map {
return fmt.Errorf("cannot use %v as query params", t)
}
switch t.Kind() {
case reflect.Struct:
fields := reflect.VisibleFields(t)
for _, f := range fields {
fv := v.FieldByIndex(f.Index)
t := getFieldType(fv)
if !validParamType(t) {
return fmt.Errorf("cannot use field '%s' of type '%v' as a query param", f.Name, t)
}

}
case reflect.Map:
key := t.Key()
if key.Kind() != reflect.String {
return fmt.Errorf("cannot use map key of type '%v' for query param name", key)
}
for _, k := range v.MapKeys() {
f := v.MapIndex(k)
t := getFieldType(f)
if !validParamType(t) {
return fmt.Errorf("cannot use map value type '%v' as a query param", t)
}
}
}
return nil
}

func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
csvReader := csv.NewReader(rawResponse)
csvReader.FieldsPerRecord = -1
return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
// getFieldType extracts type of value
func getFieldType(v reflect.Value) reflect.Type {
t := v.Type()
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
if t.Kind() == reflect.Interface && !v.IsNil() {
t = reflect.ValueOf(v.Interface()).Type()
}
return t
}

// timeType is the exact type for the Time
var timeType = reflect.TypeOf(time.Time{})

// validParamType validates that t is primitive type or string or interface
func validParamType(t reflect.Type) bool {
return (t.Kind() > reflect.Invalid && t.Kind() < reflect.Complex64) ||
t.Kind() == reflect.String ||
t == timeType
}

// TablePosition returns actual flux table position in the result, or -1 if no table was found yet
Expand Down
Loading