Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding public logging API #180

Merged
merged 2 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

### Features
1. [#165](https://github.com/influxdata/influxdb-client-go/pull/165) Allow overriding the http.Client for the http service.
1. [#179](https://github.com/influxdata/influxdb-client-go/pull/179) Unifying retry strategy among InfluxDB 2 clients: added exponential backoff.
1. [#179](https://github.com/influxdata/influxdb-client-go/pull/179) Unifying retry strategy among InfluxDB 2 clients: added exponential backoff.
1. [#180](https://github.com/influxdata/influxdb-client-go/pull/180) Provided public logger API to enable overriding logging. It is also possible to disable logging.

### Bug fixes
1. [#175](https://github.com/influxdata/influxdb-client-go/pull/175) Fixed WriteAPIs management. Keeping single instance for each org and bucket pair.
Expand Down
3 changes: 3 additions & 0 deletions api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/influxdata/influxdb-client-go/api/query"
"github.com/influxdata/influxdb-client-go/domain"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
)

const (
Expand Down Expand Up @@ -74,6 +75,7 @@ func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.D
if err != nil {
return "", err
}
log.Debugf("Query: %s", string(qrJSON))
var body string
perror := q.httpService.PostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
Expand Down Expand Up @@ -123,6 +125,7 @@ func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult,
if err != nil {
return nil, err
}
log.Debugf("Query: %s", string(qrJSON))
perror := q.httpService.PostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")
Expand Down
18 changes: 9 additions & 9 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (w *WriteAPIImpl) waitForFlushing() {
if writeBuffInfo.writeBuffLen == 0 {
break
}
log.Log.Info("Waiting buffer is flushed")
log.Info("Waiting buffer is flushed")
time.Sleep(time.Millisecond)
}
for {
Expand All @@ -103,13 +103,13 @@ func (w *WriteAPIImpl) waitForFlushing() {
if writeBuffInfo.writeBuffLen == 0 {
break
}
log.Log.Info("Waiting buffer is flushed")
log.Info("Waiting buffer is flushed")
time.Sleep(time.Millisecond)
}
}

func (w *WriteAPIImpl) bufferProc() {
log.Log.Info("Buffer proc started")
log.Info("Buffer proc started")
ticker := time.NewTicker(time.Duration(w.writeOptions.FlushInterval()) * time.Millisecond)
x:
for {
Expand All @@ -132,21 +132,21 @@ x:
w.bufferInfoCh <- buffInfo
}
}
log.Log.Info("Buffer proc finished")
log.Info("Buffer proc finished")
w.doneCh <- struct{}{}
}

func (w *WriteAPIImpl) flushBuffer() {
if len(w.writeBuffer) > 0 {
log.Log.Info("sending batch")
log.Info("sending batch")
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval())
w.writeCh <- batch
w.writeBuffer = w.writeBuffer[:0]
}
}

func (w *WriteAPIImpl) writeProc() {
log.Log.Info("Write proc started")
log.Info("Write proc started")
x:
for {
select {
Expand All @@ -156,14 +156,14 @@ x:
w.errCh <- err
}
case <-w.writeStop:
log.Log.Info("Write proc: received stop")
log.Info("Write proc: received stop")
break x
case buffInfo := <-w.writeInfoCh:
buffInfo.writeBuffLen = len(w.writeCh)
w.writeInfoCh <- buffInfo
}
}
log.Log.Info("Write proc finished")
log.Info("Write proc finished")
w.doneCh <- struct{}{}
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func (w *WriteAPIImpl) WriteRecord(line string) {
func (w *WriteAPIImpl) WritePoint(point *write.Point) {
line, err := w.service.EncodePoints(point)
if err != nil {
log.Log.Errorf("point encoding error: %s\n", err.Error())
log.Errorf("point encoding error: %s\n", err.Error())
} else {
w.bufferCh <- line
}
Expand Down
8 changes: 4 additions & 4 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

"github.com/influxdata/influxdb-client-go/api/write"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
"github.com/influxdata/influxdb-client-go/internal/test"
"github.com/influxdata/influxdb-client-go/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestWriteAPIImpl_Write(t *testing.T) {

func TestGzipWithFlushing(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetDebugLevel(4)
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true))
points := genPoints(5)
for _, p := range points {
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestFlushInterval(t *testing.T) {

func TestRetry(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetDebugLevel(5)
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
points := genPoints(15)
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestRetry(t *testing.T) {

func TestWriteError(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetDebugLevel(3)
log.Log.SetLogLevel(log.DebugLevel)
service.SetReplyError(&ihttp.Error{
StatusCode: 400,
Code: "write",
Expand Down
9 changes: 6 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"github.com/influxdata/influxdb-client-go/api"
"github.com/influxdata/influxdb-client-go/domain"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
ilog "github.com/influxdata/influxdb-client-go/internal/log"
"github.com/influxdata/influxdb-client-go/log"
)

// Client provides API to communicate with InfluxDBServer.
Expand Down Expand Up @@ -101,8 +102,10 @@ func NewClientWithOptions(serverURL string, authToken string, options *Options)
httpService: service,
apiClient: domain.NewClientWithResponses(service),
}
log.Log.SetDebugLevel(client.Options().LogLevel())
log.Log.Infof("Using URL '%s', token '%s'", serverURL, authToken)
if log.Log != nil {
log.Log.SetLogLevel(options.LogLevel())
}
ilog.Infof("Using URL '%s', token '%s'", serverURL, authToken)
return client
}
func (c *clientImpl) Options() *Options {
Expand Down
4 changes: 3 additions & 1 deletion internal/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"

http2 "github.com/influxdata/influxdb-client-go/api/http"
"github.com/influxdata/influxdb-client-go/internal/log"
)

// RequestCallback defines function called after a request is created before any call
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewService(serverURL, authorization string, httpOptions *http2.Options) Ser
serverAPIURL: serverAPIURL,
serverURL: serverURL,
authorization: authorization,
client: httpOptions.HTTPClient(),
client: httpOptions.HTTPClient(),
}
}

Expand Down Expand Up @@ -114,6 +115,7 @@ func (s *service) DoHTTPRequest(req *http.Request, requestCallback RequestCallba
}

func (s *service) DoHTTPRequestWithResponse(req *http.Request, requestCallback RequestCallback) (*http.Response, error) {
log.Infof("HTTP %s req to %s", req.Method, req.URL.String())
req.Header.Set("Authorization", s.authorization)
req.Header.Set("User-Agent", UserAgent)
if requestCallback != nil {
Expand Down
65 changes: 27 additions & 38 deletions internal/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,50 @@
package log

import (
"fmt"
"log"
ilog "github.com/influxdata/influxdb-client-go/log"
)

var Log Logger

// Logger provides filtered and categorized logging API.
// It logs to standard logger, only errors by default
type Logger struct {
debugLevel uint
}

// SetDebugLevel to filter log messages. Each level mean to log all categories bellow
// 0 errors , 1 - warning, 2 - info, 3 - debug
func (l *Logger) SetDebugLevel(debugLevel uint) {
l.debugLevel = debugLevel
}

func (l *Logger) Debugf(format string, v ...interface{}) {
if l.debugLevel > 2 {
log.Print("[D]! ", fmt.Sprintf(format, v...))
func Debugf(format string, v ...interface{}) {
if ilog.Log != nil {
ilog.Log.Debugf(format, v...)
}
}
func (l *Logger) Debug(msg string) {
if l.debugLevel > 2 {
log.Print("[D]! ", msg)
func Debug(msg string) {
if ilog.Log != nil {
ilog.Log.Debug(msg)
}
}

func (l *Logger) Infof(format string, v ...interface{}) {
if l.debugLevel > 1 {
log.Print("[I]! ", fmt.Sprintf(format, v...))
func Infof(format string, v ...interface{}) {
if ilog.Log != nil {
ilog.Log.Infof(format, v...)
}
}
func (l *Logger) Info(msg string) {
if l.debugLevel > 1 {
log.Print("[I]! ", msg)
func Info(msg string) {
if ilog.Log != nil {
ilog.Log.Info(msg)
}
}

func (l *Logger) Warnf(format string, v ...interface{}) {
if l.debugLevel > 0 {
log.Print("[W]! ", fmt.Sprintf(format, v...))
func Warnf(format string, v ...interface{}) {
if ilog.Log != nil {
ilog.Log.Warnf(format, v...)
}
}
func (l *Logger) Warn(msg string) {
if l.debugLevel > 0 {
log.Print("[W]! ", msg)
func Warn(msg string) {
if ilog.Log != nil {
ilog.Log.Warn(msg)
}
}

func (l *Logger) Errorf(format string, v ...interface{}) {
log.Print("[E]! ", fmt.Sprintf(format, v...))
func Errorf(format string, v ...interface{}) {
if ilog.Log != nil {
ilog.Log.Errorf(format, v...)
}
}

func (l *Logger) Error(msg string) {
log.Print("[E]! ", msg)
func Error(msg string) {
if ilog.Log != nil {
ilog.Log.Error(msg)
}
}
51 changes: 51 additions & 0 deletions internal/log/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2020 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

package log_test

import (
"log"
"strings"
"testing"

ilog "github.com/influxdata/influxdb-client-go/internal/log"
dlog "github.com/influxdata/influxdb-client-go/log"
"github.com/stretchr/testify/assert"
)

func TestLogging(t *testing.T) {
var sb strings.Builder
log.SetOutput(&sb)
dlog.Log.SetLogLevel(dlog.DebugLevel)
//test default settings
ilog.Debug("Debug")
ilog.Debugf("Debugf %s %d", "message", 1)
ilog.Info("Info")
ilog.Infof("Infof %s %d", "message", 2)
ilog.Warn("Warn")
ilog.Warnf("Warnf %s %d", "message", 3)
ilog.Error("Error")
ilog.Errorf("Errorf %s %d", "message", 4)
assert.True(t, strings.Contains(sb.String(), "Debug"))
assert.True(t, strings.Contains(sb.String(), "Debugf message 1"))
assert.True(t, strings.Contains(sb.String(), "Info"))
assert.True(t, strings.Contains(sb.String(), "Infof message 2"))
assert.True(t, strings.Contains(sb.String(), "Warn"))
assert.True(t, strings.Contains(sb.String(), "Warnf message 3"))
assert.True(t, strings.Contains(sb.String(), "Error"))
assert.True(t, strings.Contains(sb.String(), "Errorf message 4"))

sb.Reset()

dlog.Log = nil
ilog.Debug("Debug")
ilog.Debugf("Debugf %s %d", "message", 1)
ilog.Info("Info")
ilog.Infof("Infof %s %d", "message", 2)
ilog.Warn("Warn")
ilog.Warnf("Warnf %s %d", "message", 3)
ilog.Error("Error")
ilog.Errorf("Errorf %s %d", "message", 4)
assert.True(t, len(sb.String()) == 0)
}