Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP handler for metrics querying #3095

Merged
merged 14 commits into from
Jun 21, 2021
14 changes: 7 additions & 7 deletions cmd/query/app/handler_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) {
for _, tc := range []spanstore.Reader{nil, mockReader} {
archiveReader := tc // capture loop var
t.Run(fmt.Sprint(archiveReader), func(t *testing.T) {
withTestServer(t, func(ts *testServer) {
albertteoh marked this conversation as resolved.
Show resolved Hide resolved
withTestServer(func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
var response structuredResponse
Expand All @@ -54,7 +54,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {
mockReader := &spanstoremocks.Reader{}
mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -69,7 +69,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {

// Test failure in parsing trace ID.
func TestArchiveTrace_BadTraceID(t *testing.T) {
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/badtraceid", []string{}, &response)
assert.Error(t, err)
Expand All @@ -83,7 +83,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
Return(nil, spanstore.ErrTraceNotFound).Once()
mockWriter := &spanstoremocks.Writer{}
// Not actually going to write the trace, so no need to define mockWriter action
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -94,7 +94,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
}

func TestArchiveTrace_NoStorage(t *testing.T) {
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response)
assert.EqualError(t, err, `500 error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":500,"msg":"archive span storage was not configured"}]}`+"\n")
Expand All @@ -105,7 +105,7 @@ func TestArchiveTrace_Success(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand All @@ -118,7 +118,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand Down
28 changes: 14 additions & 14 deletions cmd/query/app/handler_deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,14 @@ func TestFilterDependencies(t *testing.T) {
}

func TestGetDependenciesSuccess(t *testing.T) {
server, _, mock := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()
expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}}
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
mock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)
ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=queen", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=queen", &response)
assert.NotEmpty(t, response.Data)
data := response.Data.([]interface{})[0]
actual := data.(map[string]interface{})
Expand All @@ -326,30 +326,30 @@ func TestGetDependenciesSuccess(t *testing.T) {
}

func TestGetDependenciesCassandraFailure(t *testing.T) {
server, _, mock := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
mock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)
ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
assert.Error(t, err)
}

func TestGetDependenciesEndTimeParsingFailure(t *testing.T) {
server, _, _ := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response)
assert.Error(t, err)
}

func TestGetDependenciesLookbackParsingFailure(t *testing.T) {
server, _, _ := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response)
assert.Error(t, err)
}
9 changes: 9 additions & 0 deletions cmd/query/app/handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
)

// HandlerOption is a function that sets some option on the APIHandler
Expand Down Expand Up @@ -65,3 +67,10 @@ func (handlerOptions) Tracer(tracer opentracing.Tracer) HandlerOption {
apiHandler.tracer = tracer
}
}

// MetricsQueryService creates a HandlerOption that initializes MetricsQueryService.
func (handlerOptions) MetricsQueryService(mqs querysvc.MetricsQueryService) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.metricsQueryService = mqs
}
}
146 changes: 100 additions & 46 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package app
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"time"

"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
Expand All @@ -34,15 +36,23 @@ import (
uiconv "github.com/jaegertracing/jaeger/model/converter/json"
ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/pkg/multierror"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
traceIDParam = "traceID"
endTsParam = "endTs"
lookbackParam = "lookback"

defaultAPIPrefix = "api"
traceIDParam = "traceID"
endTsParam = "endTs"
lookbackParam = "lookback"
stepParam = "step"
rateParam = "ratePer"
quantileParam = "quantile"
groupByOperationParam = "groupByOperation"

defaultAPIPrefix = "api"
prettyPrintIndent = " "
)

// HTTPHandler handles http requests
Expand Down Expand Up @@ -71,12 +81,13 @@ func NewRouter() *mux.Router {

// APIHandler implements the query service public API by registering routes at httpPrefix
type APIHandler struct {
queryService *querysvc.QueryService
queryParser queryParser
basePath string
apiPrefix string
logger *zap.Logger
tracer opentracing.Tracer
queryService *querysvc.QueryService
metricsQueryService querysvc.MetricsQueryService
queryParser queryParser
basePath string
apiPrefix string
logger *zap.Logger
tracer opentracing.Tracer
}

// NewAPIHandler returns an APIHandler
Expand Down Expand Up @@ -115,6 +126,10 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
// TODO - remove this when UI catches up
aH.handleFunc(router, aH.getOperationsLegacy, "/services/{%s}/operations", serviceParam).Methods(http.MethodGet)
aH.handleFunc(router, aH.dependencies, "/dependencies").Methods(http.MethodGet)
aH.handleFunc(router, aH.latencies, "/metrics/latencies").Methods(http.MethodGet)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
aH.handleFunc(router, aH.calls, "/metrics/calls").Methods(http.MethodGet)
aH.handleFunc(router, aH.errors, "/metrics/errors").Methods(http.MethodGet)
aH.handleFunc(router, aH.minStep, "/metrics/minstep").Methods(http.MethodGet)
}

func (aH *APIHandler) handleFunc(
Expand Down Expand Up @@ -176,7 +191,7 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request
func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
service := r.FormValue(serviceParam)
if service == "" {
if aH.handleError(w, ErrServiceParameterRequired, http.StatusBadRequest) {
if aH.handleError(w, errServiceParameterRequired, http.StatusBadRequest) {
return
}
}
Expand Down Expand Up @@ -204,7 +219,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
}

func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
tQuery, err := aH.queryParser.parse(r)
tQuery, err := aH.queryParser.parseTraceQueryParams(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
Expand Down Expand Up @@ -259,31 +274,13 @@ func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID)
}

func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
endTsMillis, err := strconv.ParseInt(r.FormValue(endTsParam), 10, 64)
if err != nil {
err = fmt.Errorf("unable to parse %s: %w", endTimeParam, err)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
}
var lookback time.Duration
if formValue := r.FormValue(lookbackParam); len(formValue) > 0 {
lookback, err = time.ParseDuration(formValue + "ms")
if err != nil {
err = fmt.Errorf("unable to parse %s: %w", lookbackParam, err)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
}
dqp, err := aH.queryParser.parseDependenciesQueryParams(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
service := r.FormValue(serviceParam)

if lookback == 0 {
lookback = defaultDependencyLookbackDuration
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)

dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback)
dependencies, err := aH.queryService.GetDependencies(r.Context(), dqp.endTs, dqp.lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -295,6 +292,60 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) latencies(w http.ResponseWriter, r *http.Request) {
q, err := strconv.ParseFloat(r.FormValue(quantileParam), 64)
if err != nil {
aH.handleError(w, newParseError(err, quantileParam), http.StatusBadRequest)
return
}
aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) {
return aH.metricsQueryService.GetLatencies(ctx, &metricsstore.LatenciesQueryParameters{
BaseQueryParameters: baseParams,
Quantile: q,
})
})
}

func (aH *APIHandler) calls(w http.ResponseWriter, r *http.Request) {
aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) {
return aH.metricsQueryService.GetCallRates(ctx, &metricsstore.CallRateQueryParameters{
BaseQueryParameters: baseParams,
})
})
}

func (aH *APIHandler) errors(w http.ResponseWriter, r *http.Request) {
aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) {
return aH.metricsQueryService.GetErrorRates(ctx, &metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: baseParams,
})
})
}

func (aH *APIHandler) minStep(w http.ResponseWriter, r *http.Request) {
minStep, err := aH.metricsQueryService.GetMinStepDuration(r.Context(), &metricsstore.MinStepDurationQueryParameters{})
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

structuredRes := structuredResponse{
Data: minStep.Milliseconds(),
}
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) metrics(w http.ResponseWriter, r *http.Request, getMetrics func(context.Context, metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error)) {
requestParams, err := aH.queryParser.parseMetricsQueryParams(r)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
m, err := getMetrics(r.Context(), requestParams)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
aH.writeJSON(w, r, m)
}

func (aH *APIHandler) convertModelToUI(trace *model.Trace, adjust bool) (*ui.Trace, *structuredError) {
var errors []error
if adjust {
Expand Down Expand Up @@ -429,6 +480,9 @@ func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode i
if err == nil {
return false
}
if errors.Is(err, disabled.ErrDisabled) {
statusCode = http.StatusNotImplemented
}
if statusCode == http.StatusInternalServerError {
aH.logger.Error("HTTP handler, Internal Server Error", zap.Error(err))
}
Expand All @@ -446,19 +500,19 @@ func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode i
}

func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response interface{}) {
marshall := json.Marshal
if prettyPrint := r.FormValue(prettyPrintParam); prettyPrint != "" && prettyPrint != "false" {
marshall = func(v interface{}) ([]byte, error) {
return json.MarshalIndent(v, "", " ")
}
}
resp, err := marshall(response)
if err != nil {
aH.handleError(w, fmt.Errorf("failed marshalling HTTP response to JSON: %w", err), http.StatusInternalServerError)
return
prettyPrintValue := r.FormValue(prettyPrintParam)
prettyPrint := prettyPrintValue != "" && prettyPrintValue != "false"

var marshaler jsonMarshaler
switch response.(type) {
case proto.Message:
marshaler = newProtoJSONMarshaler(prettyPrint)
default:
marshaler = newStructJSONMarshaler(prettyPrint)
albertteoh marked this conversation as resolved.
Show resolved Hide resolved
}

w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(resp); err != nil {
if err := marshaler.marshal(w, response); err != nil {
aH.handleError(w, fmt.Errorf("failed writing HTTP response: %w", err), http.StatusInternalServerError)
}
}
Loading