Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding public HTTP service with example for custom Server API call #181

Merged
merged 1 commit into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
1. [#165](https://github.com/influxdata/influxdb-client-go/pull/165) Allow overriding the http.Client for the http service.
1. [#179](https://github.com/influxdata/influxdb-client-go/pull/179) Unifying retry strategy among InfluxDB 2 clients: added exponential backoff.
1. [#180](https://github.com/influxdata/influxdb-client-go/pull/180) Provided public logger API to enable overriding logging. It is also possible to disable logging.
1. [#181](https://github.com/influxdata/influxdb-client-go/pull/181) Exposed HTTP service to allow custom server API calls. Added example.

### Bug fixes
1. [#175](https://github.com/influxdata/influxdb-client-go/pull/175) Fixed WriteAPIs management. Keeping single instance for each org and bucket pair.
Expand Down
File renamed without changes.
1 change: 0 additions & 1 deletion api/http/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

// Package http holds HTTP options
package http

import (
Expand Down
38 changes: 21 additions & 17 deletions internal/http/service.go → api/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

// Package http provides http related servicing stuff
// Package http provides HTTP servicing related code.
//
// Important type is Service which handles HTTP operations. It is internally used by library and it is not necessary to use it directly for common operations.
// It can be useful when creating custom InfluxDB2 server API calls using generated code from the domain package, that are not yet exposed by API of this library.
//
// Service can be obtained from client using HTTPService() method.
// It can be also created directly. To instantiate a Service use NewService(). Remember, the authorization param is in form "Token your-auth-token". e.g. "Token DXnd7annkGteV5Wqx9G3YjO9Ezkw87nHk8OabcyHCxF5451kdBV0Ag2cG7OmZZgCUTHroagUPdxbuoyen6TSPw==".
// srv := http.NewService("http://localhost:9999", "Token my-token", http.DefaultOptions())
package http

import (
Expand All @@ -15,7 +22,7 @@ import (
"net/url"
"strconv"

http2 "github.com/influxdata/influxdb-client-go/api/http"
http2 "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
)

Expand All @@ -25,16 +32,21 @@ type RequestCallback func(req *http.Request)
// ResponseCallback defines function called after a successful response was received
type ResponseCallback func(resp *http.Response) error

// Service handles HTTP operations with taking care of mandatory request headers
// Service handles HTTP operations with taking care of mandatory request headers and known errors
type Service interface {
PostRequest(ctx context.Context, url string, body io.Reader, requestCallback RequestCallback, responseCallback ResponseCallback) *Error
GetRequest(ctx context.Context, url string, requestCallback RequestCallback, responseCallback ResponseCallback) *Error
// DoPostRequest sends HTTP POST request to the given url with body
DoPostRequest(ctx context.Context, url string, body io.Reader, requestCallback RequestCallback, responseCallback ResponseCallback) *Error
// DoHTTPRequest sends given HTTP request and handles response
DoHTTPRequest(req *http.Request, requestCallback RequestCallback, responseCallback ResponseCallback) *Error
// DoHTTPRequestWithResponse sends given HTTP request and returns response
DoHTTPRequestWithResponse(req *http.Request, requestCallback RequestCallback) (*http.Response, error)
// SetAuthorization sets the authorization header value
SetAuthorization(authorization string)
// Authorization returns current authorization header value
Authorization() string
HTTPClient() *http.Client
// ServerAPIURL returns URL to InfluxDB2 server API space
ServerAPIURL() string
// ServerURL returns URL to InfluxDB2 server
ServerURL() string
}

Expand All @@ -47,7 +59,7 @@ type service struct {
}

// NewService creates instance of http Service with given parameters
func NewService(serverURL, authorization string, httpOptions *http2.Options) Service {
func NewService(serverURL, authorization string, httpOptions *Options) Service {
apiURL, err := url.Parse(serverURL)
serverAPIURL := serverURL
if err == nil {
Expand Down Expand Up @@ -80,11 +92,7 @@ func (s *service) Authorization() string {
return s.authorization
}

func (s *service) HTTPClient() *http.Client {
return s.client
}

func (s *service) PostRequest(ctx context.Context, url string, body io.Reader, requestCallback RequestCallback, responseCallback ResponseCallback) *Error {
func (s *service) DoPostRequest(ctx context.Context, url string, body io.Reader, requestCallback RequestCallback, responseCallback ResponseCallback) *Error {
return s.doHTTPRequestWithURL(ctx, http.MethodPost, url, body, requestCallback, responseCallback)
}

Expand Down Expand Up @@ -117,17 +125,13 @@ func (s *service) DoHTTPRequest(req *http.Request, requestCallback RequestCallba
func (s *service) DoHTTPRequestWithResponse(req *http.Request, requestCallback RequestCallback) (*http.Response, error) {
log.Infof("HTTP %s req to %s", req.Method, req.URL.String())
req.Header.Set("Authorization", s.authorization)
req.Header.Set("User-Agent", UserAgent)
req.Header.Set("User-Agent", http2.UserAgent)
if requestCallback != nil {
requestCallback(req)
}
return s.client.Do(req)
}

func (s *service) GetRequest(ctx context.Context, url string, requestCallback RequestCallback, responseCallback ResponseCallback) *Error {
return s.doHTTPRequestWithURL(ctx, http.MethodGet, url, nil, requestCallback, responseCallback)
}

func (s *service) handleHTTPError(r *http.Response) *Error {
// successful status code range
if r.StatusCode >= 200 && r.StatusCode < 300 {
Expand Down
18 changes: 18 additions & 0 deletions api/http/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2020 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

package http

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestService(t *testing.T) {
srv := NewService("http://localhost:9999/aa/", "Token my-token", DefaultOptions())
assert.Equal(t, "http://localhost:9999/aa/", srv.ServerURL())
assert.Equal(t, "http://localhost:9999/aa/api/v2/", srv.ServerAPIURL())
assert.Equal(t, "Token my-token", srv.Authorization())
}
10 changes: 5 additions & 5 deletions api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync"
"time"

http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/query"
"github.com/influxdata/influxdb-client-go/domain"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
)

Expand All @@ -49,7 +49,7 @@ type QueryAPI interface {
Query(ctx context.Context, query string) (*QueryTableResult, error)
}

func NewQueryAPI(org string, service ihttp.Service) QueryAPI {
func NewQueryAPI(org string, service http2.Service) QueryAPI {
return &queryAPI{
org: org,
httpService: service,
Expand All @@ -59,7 +59,7 @@ func NewQueryAPI(org string, service ihttp.Service) QueryAPI {
// queryAPI implements QueryAPI interface
type queryAPI struct {
org string
httpService ihttp.Service
httpService http2.Service
url string
lock sync.Mutex
}
Expand All @@ -77,7 +77,7 @@ func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.D
}
log.Debugf("Query: %s", string(qrJSON))
var body string
perror := q.httpService.PostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")
},
Expand Down Expand Up @@ -126,7 +126,7 @@ func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult,
return nil, err
}
log.Debugf("Query: %s", string(qrJSON))
perror := q.httpService.PostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")
},
Expand Down
13 changes: 6 additions & 7 deletions api/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import (
"context"
"encoding/csv"
"fmt"
http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/query"
"github.com/influxdata/influxdb-client-go/internal/gzip"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/query"
"github.com/influxdata/influxdb-client-go/internal/gzip"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -508,7 +507,7 @@ func TestQueryRawResult(t *testing.T) {
}
}))
defer server.Close()
queryAPI := NewQueryAPI("org", ihttp.NewService(server.URL, "a", http2.DefaultOptions()))
queryAPI := NewQueryAPI("org", http2.NewService(server.URL, "a", http2.DefaultOptions()))

result, err := queryAPI.QueryRaw(context.Background(), "flux", nil)
require.Nil(t, err)
Expand Down Expand Up @@ -903,7 +902,7 @@ func TestFluxError(t *testing.T) {
}
}))
defer server.Close()
queryAPI := NewQueryAPI("org", ihttp.NewService(server.URL, "a", http2.DefaultOptions()))
queryAPI := NewQueryAPI("org", http2.NewService(server.URL, "a", http2.DefaultOptions()))

result, err := queryAPI.QueryRaw(context.Background(), "errored flux", nil)
assert.Equal(t, "", result)
Expand Down
4 changes: 2 additions & 2 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"strings"
"time"

http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/write"
"github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
iwrite "github.com/influxdata/influxdb-client-go/internal/write"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ type writeBuffInfoReq struct {
writeBuffLen int
}

func NewWriteAPI(org string, bucket string, service http.Service, writeOptions *write.Options) *WriteAPIImpl {
func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl {
w := &WriteAPIImpl{
service: iwrite.NewService(org, bucket, service, writeOptions),
writeBuffer: make([]string, 0, writeOptions.BatchSize()+1),
Expand Down
4 changes: 2 additions & 2 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"context"
"strings"

http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/write"
"github.com/influxdata/influxdb-client-go/internal/http"
iwrite "github.com/influxdata/influxdb-client-go/internal/write"
)

Expand All @@ -32,7 +32,7 @@ type writeAPIBlocking struct {
}

// NewWriteAPIBlocking creates new WriteAPIBlocking instance for org and bucket with underlying client
func NewWriteAPIBlocking(org string, bucket string, service http.Service, writeOptions *write.Options) *writeAPIBlocking {
func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) *writeAPIBlocking {
return &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
}

Expand Down
4 changes: 2 additions & 2 deletions api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"testing"
"time"

http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/write"
"github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestWriteRecord(t *testing.T) {
require.Nil(t, err)
require.Len(t, service.Lines(), 0)

service.SetReplyError(&http.Error{Code: "invalid", Message: "data"})
service.SetReplyError(&http2.Error{Code: "invalid", Message: "data"})
err = writeAPI.WriteRecord(context.Background(), lines...)
require.NotNil(t, err)
require.Equal(t, "invalid: data", err.Error())
Expand Down
6 changes: 3 additions & 3 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"testing"
"time"

"github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/api/write"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/test"
"github.com/influxdata/influxdb-client-go/log"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestRetry(t *testing.T) {
writeAPI.waitForFlushing()
require.Len(t, service.Lines(), 5)
service.Close()
service.SetReplyError(&ihttp.Error{
service.SetReplyError(&http.Error{
StatusCode: 429,
RetryAfter: 5,
})
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestRetry(t *testing.T) {
func TestWriteError(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
service.SetReplyError(&ihttp.Error{
service.SetReplyError(&http.Error{
StatusCode: 400,
Code: "write",
Message: "error",
Expand Down
12 changes: 9 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"sync"

"github.com/influxdata/influxdb-client-go/api"
"github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/domain"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
ilog "github.com/influxdata/influxdb-client-go/internal/log"
"github.com/influxdata/influxdb-client-go/log"
)
Expand All @@ -39,6 +39,8 @@ type Client interface {
Options() *Options
// ServerURL returns the url of the server url client talks to
ServerURL() string
// HTTPService returns underlying HTTP service object used by client
HTTPService() http.Service
// WriteAPI returns the asynchronous, non-blocking, Write client
WriteAPI(org, bucket string) api.WriteAPI
// WriteAPIBlocking returns the synchronous, blocking, Write client
Expand Down Expand Up @@ -66,7 +68,7 @@ type clientImpl struct {
writeAPIs map[string]api.WriteAPI
syncWriteAPIs map[string]api.WriteAPIBlocking
lock sync.Mutex
httpService ihttp.Service
httpService http.Service
apiClient *domain.ClientWithResponses
authAPI api.AuthorizationsAPI
orgAPI api.OrganizationsAPI
Expand All @@ -93,7 +95,7 @@ func NewClientWithOptions(serverURL string, authToken string, options *Options)
// For subsequent path parts concatenation, url has to end with '/'
normServerURL = serverURL + "/"
}
service := ihttp.NewService(normServerURL, "Token "+authToken, options.httpOptions)
service := http.NewService(normServerURL, "Token "+authToken, options.httpOptions)
client := &clientImpl{
serverURL: serverURL,
options: options,
Expand All @@ -116,6 +118,10 @@ func (c *clientImpl) ServerURL() string {
return c.serverURL
}

func (c *clientImpl) HTTPService() http.Service {
return c.httpService
}

func (c *clientImpl) Ready(ctx context.Context) (bool, error) {
params := &domain.GetReadyParams{}
response, err := c.apiClient.GetReadyWithResponse(ctx, params)
Expand Down
35 changes: 35 additions & 0 deletions client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,40 @@ func TestQueryV1Compatibility(t *testing.T) {
}
assert.Equal(t, 42, rows)
}
}

func TestHTTPService(t *testing.T) {
client := influxdb2.NewClient("http://localhost:9999", "my-token")
apiClient := domain.NewClientWithResponses(client.HTTPService())
org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
if err != nil {
//return err
t.Error(err)
}
taskDescription := "Example task"
taskFlux := `option task = {
name: "My task",
every: 1h
}

from(bucket:"my-bucket") |> range(start: -1m) |> last()`
taskStatus := domain.TaskStatusTypeActive
taskRequest := domain.TaskCreateRequest{
Org: &org.Name,
OrgID: org.Id,
Description: &taskDescription,
Flux: taskFlux,
Status: &taskStatus,
}
resp, err := apiClient.PostTasksWithResponse(context.Background(), &domain.PostTasksParams{}, domain.PostTasksJSONRequestBody(taskRequest))
if err != nil {
//return err
t.Error(err)
}
if resp.JSONDefault != nil {
t.Error(resp.JSONDefault.Message)
}
if assert.NotNil(t, resp.JSON201) {
assert.Equal(t, "My task", resp.JSON201.Name)
}
}