Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Rework jaegerquery wiring to pass down config
Browse files Browse the repository at this point in the history
The config object will be needed to specify the max
trace duration the query can expect.
  • Loading branch information
cevian committed May 9, 2022
1 parent e58c9f5 commit 5c4a2d1
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 34 deletions.
5 changes: 3 additions & 2 deletions pkg/api/router.go
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/timescale/promscale/pkg/ha"
haClient "github.com/timescale/promscale/pkg/ha/client"
"github.com/timescale/promscale/pkg/jaeger"
jaegerQuery "github.com/timescale/promscale/pkg/jaeger/query"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
pgMetrics "github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/telemetry"
)

func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client) (*mux.Router, error) {
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query) (*mux.Router, error) {
var writePreprocessors []parser.Preprocessor
if apiConf.HighAvailability {
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
Expand Down Expand Up @@ -100,7 +101,7 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
router.Path("/healthz").Methods(http.MethodGet).HandlerFunc(Health(healthChecker))
router.Path(apiConf.TelemetryPath).Methods(http.MethodGet).HandlerFunc(promhttp.Handler().ServeHTTP)

jaeger.ExtendQueryAPIs(router, client.Connection)
jaeger.ExtendQueryAPIs(router, client.Connection, query)

debugProf := router.PathPrefix("/debug/pprof").Subrouter()
debugProf.Path("").Methods(http.MethodGet).HandlerFunc(pprof.Index)
Expand Down
3 changes: 1 addition & 2 deletions pkg/jaeger/api.go
Expand Up @@ -9,8 +9,7 @@ import (
"github.com/timescale/promscale/pkg/pgxconn"
)

func ExtendQueryAPIs(r *mux.Router, conn pgxconn.PgxConn) {
reader := query.New(conn)
func ExtendQueryAPIs(r *mux.Router, conn pgxconn.PgxConn, reader *query.Query) {
handler := jaegerQueryApp.NewAPIHandler(
jaegerQueryService.NewQueryService(reader, reader, jaegerQueryService.QueryServiceOptions{}),
)
Expand Down
27 changes: 27 additions & 0 deletions pkg/jaeger/query/config.go
@@ -0,0 +1,27 @@
package query

import (
"flag"
"time"
)

const (
DefaultMaxTraceDuration = time.Hour
)

type Config struct {
MaxTraceDuration time.Duration
}

var DefaultConfig = Config{
MaxTraceDuration: DefaultMaxTraceDuration,
}

func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.DurationVar(&cfg.MaxTraceDuration, "tracing.max-trace-duration", DefaultMaxTraceDuration, "Maximum duration of any trace in the system. This parameter is used to optimize queries.")
return cfg
}

func Validate(cfg *Config) error {
return nil
}
4 changes: 2 additions & 2 deletions pkg/jaeger/query/find_trace_ids.go
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/timescale/promscale/pkg/pgxconn"
)

func findTraceIDs(ctx context.Context, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
query, params := findTraceIDsQuery(q)
func findTraceIDs(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
query, params := builder.findTraceIDsQuery(q)
rows, err := conn.Query(ctx, query, params...)
if err != nil {
return nil, fmt.Errorf("querying traces: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/jaeger/query/find_traces.go
Expand Up @@ -15,8 +15,8 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

func findTraces(ctx context.Context, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
query, params := findTracesQuery(q)
func findTraces(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
query, params := builder.findTracesQuery(q)
rows, err := conn.Query(ctx, query, params...)
if err != nil {
return nil, fmt.Errorf("querying traces error: %w query:\n%s", err, query)
Expand Down
4 changes: 2 additions & 2 deletions pkg/jaeger/query/get_trace.go
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/timescale/promscale/pkg/pgxconn"
)

func getTrace(ctx context.Context, conn pgxconn.PgxConn, traceID model.TraceID) (*model.Trace, error) {
query, params, err := getTraceQuery(traceID)
func getTrace(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, traceID model.TraceID) (*model.Trace, error) {
query, params, err := builder.getTraceQuery(traceID)
if err != nil {
return nil, fmt.Errorf("get trace query: %w", err)
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/jaeger/query/query.go
Expand Up @@ -20,11 +20,13 @@ import (
)

type Query struct {
conn pgxconn.PgxConn
conn pgxconn.PgxConn
cfg *Config
builder *Builder
}

func New(conn pgxconn.PgxConn) *Query {
return &Query{conn}
func New(conn pgxconn.PgxConn, cfg *Config) *Query {
return &Query{conn, cfg, NewBuilder(cfg)}
}

func (p *Query) SpanReader() spanstore.Reader {
Expand All @@ -46,7 +48,7 @@ func (p *Query) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Tra
metrics.Query.With(prometheus.Labels{"type": "trace", "handler": "Get_Trace", "code": code}).Inc()
metrics.QueryDuration.With(prometheus.Labels{"type": "trace", "handler": "Get_Trace", "code": code}).Observe(time.Since(start).Seconds())
}()
res, err := getTrace(ctx, p.conn, traceID)
res, err := getTrace(ctx, p.builder, p.conn, traceID)
if err != nil {
return nil, logError(err)
}
Expand Down Expand Up @@ -92,7 +94,7 @@ func (p *Query) FindTraces(ctx context.Context, query *spanstore.TraceQueryParam
metrics.Query.With(prometheus.Labels{"type": "trace", "handler": "Find_Traces", "code": code}).Inc()
metrics.QueryDuration.With(prometheus.Labels{"type": "trace", "handler": "Find_Traces", "code": code}).Observe(time.Since(start).Seconds())
}()
res, err := findTraces(ctx, p.conn, query)
res, err := findTraces(ctx, p.builder, p.conn, query)
if err != nil {
return nil, logError(err)
}
Expand All @@ -108,7 +110,7 @@ func (p *Query) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryPar
metrics.Query.With(prometheus.Labels{"type": "trace", "handler": "Find_Trace_IDs", "code": code}).Inc()
metrics.QueryDuration.With(prometheus.Labels{"type": "trace", "handler": "Find_Trace_IDs", "code": code}).Observe(time.Since(start).Seconds())
}()
res, err := findTraceIDs(ctx, p.conn, query)
res, err := findTraceIDs(ctx, p.builder, p.conn, query)
if err != nil {
return nil, logError(err)
}
Expand Down
32 changes: 20 additions & 12 deletions pkg/jaeger/query/trace_query.go
Expand Up @@ -108,42 +108,50 @@ const (
`
)

func buildCompleteTraceQuery(traceIDClause string) string {
type Builder struct {
cfg *Config
}

func NewBuilder(cfg *Config) *Builder {
return &Builder{cfg}
}

func (b *Builder) buildCompleteTraceQuery(traceIDClause string) string {
return fmt.Sprintf(
completeTraceSQLFormat,
traceIDClause)
}

func findTracesQuery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
subquery, params := buildTraceIDSubquery(q)
func (b *Builder) findTracesQuery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
subquery, params := b.buildTraceIDSubquery(q)
traceIDClause := "s.trace_id = trace_ids.trace_id"
completeTraceSQL := buildCompleteTraceQuery(traceIDClause)
completeTraceSQL := b.buildCompleteTraceQuery(traceIDClause)
return fmt.Sprintf(findTraceSQLFormat, subquery, completeTraceSQL), params
}

func findTraceIDsQuery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
subquery, params := buildTraceIDSubquery(q)
func (b *Builder) findTraceIDsQuery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
subquery, params := b.buildTraceIDSubquery(q)
return subquery, params
}

func getTraceQuery(traceID model.TraceID) (string, []interface{}, error) {
var b [16]byte
n, err := traceID.MarshalTo(b[:])
func (b *Builder) getTraceQuery(traceID model.TraceID) (string, []interface{}, error) {
var buf [16]byte
n, err := traceID.MarshalTo(buf[:])
if n != 16 || err != nil {
return "", nil, fmt.Errorf("marshaling TraceID: %w", err)
}

var uuid pgtype.UUID
if err := uuid.Set(b); err != nil {
if err := uuid.Set(buf); err != nil {
return "", nil, fmt.Errorf("setting TraceID: %w", err)
}
params := []interface{}{uuid}

traceIDClause := "s.trace_id = $1"
return buildCompleteTraceQuery(traceIDClause), params, nil
return b.buildCompleteTraceQuery(traceIDClause), params, nil
}

func buildTraceIDSubquery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
func (b *Builder) buildTraceIDSubquery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
clauses := make([]string, 0, 15)
params := make([]interface{}, 0, 15)

Expand Down
6 changes: 6 additions & 0 deletions pkg/runner/flags.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/peterbourgon/ff/v3"
"github.com/peterbourgon/ff/v3/ffyaml"
"github.com/timescale/promscale/pkg/api"
tracingquery "github.com/timescale/promscale/pkg/jaeger/query"
"github.com/timescale/promscale/pkg/limits"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
Expand All @@ -36,6 +37,7 @@ type Config struct {
TenancyCfg tenancy.Config
PromQLCfg query.Config
RulesCfg rules.Config
TracingCfg tracingquery.Config
ConfigFile string
DatasetConfig string
TLSCertFile string
Expand Down Expand Up @@ -128,6 +130,7 @@ func ParseFlags(cfg *Config, args []string) (*Config, error) {
limits.ParseFlags(fs, &cfg.LimitsCfg)
tenancy.ParseFlags(fs, &cfg.TenancyCfg)
query.ParseFlags(fs, &cfg.PromQLCfg)
tracingquery.ParseFlags(fs, &cfg.TracingCfg)
rules.ParseFlags(fs, &cfg.RulesCfg)

fs.StringVar(&cfg.ConfigFile, "config", "config.yml", "YAML configuration file path for Promscale.")
Expand Down Expand Up @@ -233,6 +236,9 @@ func validate(cfg *Config) error {
if err := query.Validate(&cfg.PromQLCfg); err != nil {
return fmt.Errorf("error validating PromQL configuration: %w", err)
}
if err := tracingquery.Validate(&cfg.TracingCfg); err != nil {
return fmt.Errorf("error validating Tracing query configuration: %w", err)
}
if err := tenancy.Validate(&cfg.TenancyCfg); err != nil {
return fmt.Errorf("error validating multi-tenancy configuration: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/runner/runner.go
Expand Up @@ -137,7 +137,9 @@ func Run(cfg *Config) error {
}
}

router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client)
jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)

router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery)
if err != nil {
log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
return fmt.Errorf("generate router: %w", err)
Expand Down Expand Up @@ -198,7 +200,7 @@ func Run(cfg *Config) error {
)

queryPlugin := shared.StorageGRPCPlugin{
Impl: query.New(client.QuerierConnection),
Impl: jaegerQuery,
}
if err = queryPlugin.GRPCServer(nil, grpcServer); err != nil {
log.Error("msg", "Creating jaeger query GRPC server failed", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/end_to_end_tests/ingest_trace_test.go
Expand Up @@ -63,7 +63,7 @@ func TestQueryTraces(t *testing.T) {
err = ingestor.IngestTraces(context.Background(), traces)
require.NoError(t, err)

q := query.New(pgxconn.NewQueryLoggingPgxConn(db))
q := query.New(pgxconn.NewQueryLoggingPgxConn(db), &query.DefaultConfig)

getOperationsTest(t, q)
findTraceTest(t, q)
Expand Down
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model"

"github.com/timescale/promscale/pkg/api"
jaegerquery "github.com/timescale/promscale/pkg/jaeger/query"
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/query"
Expand Down Expand Up @@ -321,7 +322,9 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config) (*mux.Router,
return nil, nil, fmt.Errorf("init promql engine: %w", err)
}

router, err := api.GenerateRouter(cfg, qryCfg, pgClient)
jaegerQuery := jaegerquery.New(pgClient.QuerierConnection, &jaegerquery.DefaultConfig)

router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery)
if err != nil {
return nil, nil, fmt.Errorf("generate router: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/tests/end_to_end_tests/trace_query_integration_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"

promscaleJaeger "github.com/timescale/promscale/pkg/jaeger"
jaegerquery "github.com/timescale/promscale/pkg/jaeger/query"
ingstr "github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/pgxconn"
)
Expand Down Expand Up @@ -65,8 +66,8 @@ func TestCompareTraceQueryResponse(t *testing.T) {
// Start Promscale's HTTP endpoint for Jaeger query.
router, _, err := buildRouter(db)
require.NoError(t, err)

promscaleJaeger.ExtendQueryAPIs(router, pgxconn.NewPgxConn(db))
jaegerQuery := jaegerquery.New(pgxconn.NewPgxConn(db), &jaegerquery.DefaultConfig)
promscaleJaeger.ExtendQueryAPIs(router, pgxconn.NewPgxConn(db), jaegerQuery)

go func() {
listener, err := net.Listen("tcp", ":9201")
Expand Down

0 comments on commit 5c4a2d1

Please sign in to comment.