Skip to content

Commit

Permalink
fix(query): change logging bridge to be a service instead
Browse files Browse the repository at this point in the history
It is no longer necessary for the query logging to be a bridge as the
stats are available for consumption from the ProxyQueryService.
This change changes the logging bridge to directly implement the proxy
query service instead of implementing a bridge.
  • Loading branch information
nathanielc committed Apr 11, 2019
1 parent 9a5126d commit f37e65f
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 65 deletions.
4 changes: 2 additions & 2 deletions http/proxy_query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/query/mock"
"go.uber.org/zap"
)

Expand All @@ -34,7 +34,7 @@ func TestProxyQueryService_Query(t *testing.T) {
t.Fatalf("error adding dialect mappings: %v", err)
}
h.ProxyQueryService = &mock.ProxyQueryService{
QueryFn: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
QueryF: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
if _, err := io.WriteString(w, "boo"); err != nil {
return flux.Statistics{}, err
}
Expand Down
35 changes: 0 additions & 35 deletions mock/proxy_query_service.go

This file was deleted.

48 changes: 20 additions & 28 deletions query/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/iocounter"
"github.com/influxdata/influxdb/kit/check"
"github.com/influxdata/influxdb/kit/tracing"
)

// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface.
type LoggingServiceBridge struct {
QueryService QueryService
QueryLogger Logger
// LoggingProxyQueryService wraps a ProxyQueryService and logs the queries.
type LoggingProxyQueryService struct {
ProxyQueryService ProxyQueryService
QueryLogger Logger
NowFunction func() time.Time
}

// Query executes and logs the query.
func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
func (s *LoggingProxyQueryService) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -28,42 +30,32 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox
if r != nil {
err = fmt.Errorf("panic: %v", r)
}
var now time.Time
if s.NowFunction != nil {
now = s.NowFunction()
} else {
now = time.Now()
}
log := Log{
OrganizationID: req.Request.OrganizationID,
ProxyRequest: req,
ResponseSize: n,
Time: time.Now(),
Time: now,
Statistics: stats,
}
if err != nil {
log.Error = err
Error: err,
}
s.QueryLogger.Log(log)
}()

results, err := s.QueryService.Query(ctx, &req.Request)
if err != nil {
return stats, tracing.LogError(span, err)
}
// Check if this result iterator reports stats. We call this defer before cancel because
// the query needs to be finished before it will have valid statistics.
defer func() {
results.Release()
stats = results.Statistics()
}()

encoder := req.Dialect.Encoder()
n, err = encoder.Encode(w, results)
wc := &iocounter.Writer{Writer: w}
stats, err = s.ProxyQueryService.Query(ctx, wc, req)
if err != nil {
return stats, tracing.LogError(span, err)
}
// The results iterator may have had an error independent of encoding errors.
if err = results.Err(); err != nil {
return stats, tracing.LogError(span, err)
}
n = wc.Count()
return stats, nil
}

func (s *LoggingServiceBridge) Check(ctx context.Context) check.Response {
return s.QueryService.Check(ctx)
func (s *LoggingProxyQueryService) Check(ctx context.Context) check.Response {
return s.ProxyQueryService.Check(ctx)
}
96 changes: 96 additions & 0 deletions query/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package query_test

import (
"bytes"
"context"
"io"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/flux"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/query/mock"
)

var orgID = MustIDBase16("ba55ba55ba55ba55")

// MustIDBase16 is an helper to ensure a correct ID is built during testing.
func MustIDBase16(s string) platform.ID {
id, err := platform.IDFromString(s)
if err != nil {
panic(err)
}
return *id
}

var opts = []cmp.Option{
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
cmpopts.IgnoreUnexported(query.Request{}),
}

func TestLoggingProxyQueryService(t *testing.T) {
wantStats := flux.Statistics{
TotalDuration: time.Second,
CompileDuration: time.Second,
QueueDuration: time.Second,
PlanDuration: time.Second,
RequeueDuration: time.Second,
ExecuteDuration: time.Second,
Concurrency: 2,
MaxAllocated: 2048,
}
wantBytes := 10
pqs := &mock.ProxyQueryService{
QueryF: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) {
w.Write(make([]byte, wantBytes))
return wantStats, nil
},
}
var logs []query.Log
logger := &mock.QueryLogger{
LogFn: func(l query.Log) error {
logs = append(logs, l)
return nil
},
}

wantTime := time.Now()
lpqs := query.LoggingProxyQueryService{
ProxyQueryService: pqs,
QueryLogger: logger,
NowFunction: func() time.Time {
return wantTime
},
}

var buf bytes.Buffer
req := &query.ProxyRequest{
Request: query.Request{
Authorization: nil,
OrganizationID: orgID,
Compiler: nil,
},
Dialect: nil,
}
stats, err := lpqs.Query(context.Background(), &buf, req)
if err != nil {
t.Fatal(err)
}
if !cmp.Equal(wantStats, stats, opts...) {
t.Errorf("unexpected query stats: -want/+got\n%s", cmp.Diff(wantStats, stats, opts...))
}
wantLogs := []query.Log{{
Time: wantTime,
OrganizationID: orgID,
Error: nil,
ProxyRequest: req,
ResponseSize: int64(wantBytes),
Statistics: wantStats,
}}
if !cmp.Equal(wantLogs, logs, opts...) {
t.Errorf("unexpected query logs: -want/+got\n%s", cmp.Diff(wantLogs, logs, opts...))
}
}
13 changes: 13 additions & 0 deletions query/mock/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package mock

import "github.com/influxdata/influxdb/query"

var _ query.Logger = (*QueryLogger)(nil)

type QueryLogger struct {
LogFn func(query.Log) error
}

func (l *QueryLogger) Log(log query.Log) error {
return l.LogFn(log)
}

0 comments on commit f37e65f

Please sign in to comment.