Skip to content

Commit

Permalink
report code on metric if error
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Feb 28, 2019
1 parent 16e0aee commit efde726
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 23 deletions.
27 changes: 18 additions & 9 deletions agent/agent.go
Expand Up @@ -23,7 +23,7 @@ package agent
import (
"context"
gojson "encoding/json"
"errors"
e "errors"
"fmt"
"net"
"strings"
Expand All @@ -36,6 +36,7 @@ import (
"github.com/topfreegames/pitaya/conn/message"
"github.com/topfreegames/pitaya/conn/packet"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/errors"
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/metrics"
"github.com/topfreegames/pitaya/protos"
Expand Down Expand Up @@ -168,14 +169,14 @@ func (a *Agent) ResponseMID(ctx context.Context, mid uint, v interface{}, isErro
if a.GetStatus() == constants.StatusClosed {
err := constants.ErrBrokenPipe
tracing.FinishSpan(ctx, err)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, true)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
return err
}

if mid <= 0 {
err := constants.ErrSessionOnNotify
tracing.FinishSpan(ctx, err)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, true)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
return err
}

Expand Down Expand Up @@ -355,7 +356,7 @@ func (a *Agent) write() {
if err != nil {
tracing.FinishSpan(data.ctx, err)
if data.typ == message.Response {
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, true)
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, err)
}
logger.Log.Error("cannot serialize message and respond to the client ", err.Error())
break
Expand All @@ -374,7 +375,7 @@ func (a *Agent) write() {
if err != nil {
tracing.FinishSpan(data.ctx, err)
if data.typ == message.Response {
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, true)
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, err)
}
logger.Log.Error(err.Error())
break
Expand All @@ -385,7 +386,7 @@ func (a *Agent) write() {
if err != nil {
tracing.FinishSpan(data.ctx, err)
if data.typ == message.Response {
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, true)
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, err)
}
logger.Log.Error(err)
break
Expand All @@ -394,15 +395,23 @@ func (a *Agent) write() {
if _, err := a.conn.Write(p); err != nil {
tracing.FinishSpan(data.ctx, err)
if data.typ == message.Response {
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, true)
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, err)
}
logger.Log.Error(err.Error())
return
}
var e error
tracing.FinishSpan(data.ctx, e)
if data.typ == message.Response {
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, m.Err)
var rErr error

if m.Err {
// default code is overwritten, if any
rErr = &errors.Error{Code: errors.ErrUnknownCode}
_ = a.serializer.Unmarshal(m.Data, rErr)
}

metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, rErr)
}
case <-a.chStopWrite:
return
Expand All @@ -412,7 +421,7 @@ func (a *Agent) write() {

// SendRequest sends a request to a server
func (a *Agent) SendRequest(ctx context.Context, serverID, route string, v interface{}) (*protos.Response, error) {
return nil, errors.New("not implemented")
return nil, e.New("not implemented")
}

// AnswerWithError answers with an error
Expand Down
3 changes: 1 addition & 2 deletions cluster/grpc_rpc_client.go
Expand Up @@ -112,8 +112,7 @@ func (gs *GRPCClient) Call(ctx context.Context, rpcType protos.RPCType, route *r
startTime := time.Now()
ctxT = pcontext.AddToPropagateCtx(ctxT, constants.StartTimeKey, startTime.UnixNano())
ctxT = pcontext.AddToPropagateCtx(ctxT, constants.RouteKey, route.String())
errored := err != nil
metrics.ReportTimingFromCtx(ctx, gs.metricsReporters, "rpc", errored)
metrics.ReportTimingFromCtx(ctx, gs.metricsReporters, "rpc", err)
}
done()
}()
Expand Down
3 changes: 1 addition & 2 deletions cluster/nats_rpc_client.go
Expand Up @@ -171,8 +171,7 @@ func (ns *NatsRPCClient) Call(
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, route.String())
defer func() {
typ := "rpc"
errored := err != nil
metrics.ReportTimingFromCtx(ctx, ns.metricsReporters, typ, errored)
metrics.ReportTimingFromCtx(ctx, ns.metricsReporters, typ, err)
}()
}
m, err = ns.conn.Request(getChannel(server.Type, server.ID), marshalledData, ns.reqTimeout)
Expand Down
20 changes: 20 additions & 0 deletions errors/errors.go
Expand Up @@ -74,3 +74,23 @@ func mergeMetadatas(pitayaErr *Error, metadata map[string]string) {
pitayaErr.Metadata[key] = value
}
}

// CodeFromError returns the code of error.
// If error is nil, return empty string.
// If error is not a pitaya error, returns unkown code
func CodeFromError(err error) string {
if err == nil {
return ""
}

pitayaErr, ok := err.(*Error)
if !ok {
return ErrUnknownCode
}

if pitayaErr == nil {
return ""
}

return pitayaErr.Code
}
39 changes: 39 additions & 0 deletions errors/errors_test.go
Expand Up @@ -106,3 +106,42 @@ func TestErrorError(t *testing.T) {
errStr := err.Error()
assert.Equal(t, sourceErr.Error(), errStr)
}

func TestCodeFromError(t *testing.T) {
t.Parallel()

errTest := errors.New("error")
codeNotFound := "GAME-404"

tables := map[string]struct {
err error
code string
}{
"test_not_error": {
err: nil,
code: "",
},

"test_not_pitaya_error": {
err: errTest,
code: ErrUnknownCode,
},

"test_nil_pitaya_error": {
err: func() *Error { var err *Error; return err }(),
code: "",
},

"test_pitaya_error": {
err: NewError(errTest, codeNotFound),
code: codeNotFound,
},
}

for name, table := range tables {
t.Run(name, func(t *testing.T) {
code := CodeFromError(table.err)
assert.Equal(t, table.code, code)
})
}
}
2 changes: 1 addition & 1 deletion metrics/prometheus_reporter.go
Expand Up @@ -118,7 +118,7 @@ func (p *PrometheusReporter) registerMetrics(
Objectives: map[float64]float64{0.7: 0.02, 0.95: 0.005, 0.99: 0.001},
ConstLabels: constLabels,
},
append([]string{"route", "status", "type"}, additionalLabelsKeys...),
append([]string{"route", "status", "type", "code"}, additionalLabelsKeys...),
)

// ProcessDelay summary
Expand Down
8 changes: 6 additions & 2 deletions metrics/report.go
Expand Up @@ -26,13 +26,16 @@ import (
"time"

"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/errors"

pcontext "github.com/topfreegames/pitaya/context"
)

// ReportTimingFromCtx reports the latency from the context
func ReportTimingFromCtx(ctx context.Context, reporters []Reporter, typ string, errored bool) {
func ReportTimingFromCtx(ctx context.Context, reporters []Reporter, typ string, err error) {
code := errors.CodeFromError(err)
status := "ok"
if errored {
if err != nil {
status = "failed"
}
if len(reporters) > 0 {
Expand All @@ -43,6 +46,7 @@ func ReportTimingFromCtx(ctx context.Context, reporters []Reporter, typ string,
"route": route.(string),
"status": status,
"type": typ,
"code": code,
})
for _, r := range reporters {
r.ReportSummary(ResponseTime, tags, float64(elapsed.Nanoseconds()))
Expand Down
61 changes: 55 additions & 6 deletions metrics/report_test.go
Expand Up @@ -22,6 +22,7 @@ package metrics

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/constants"
pcontext "github.com/topfreegames/pitaya/context"
e "github.com/topfreegames/pitaya/errors"
"github.com/topfreegames/pitaya/metrics/mocks"
)

Expand All @@ -42,7 +44,7 @@ func TestReportTimingFromCtx(t *testing.T) {
originalTs := time.Now().UnixNano()
expectedRoute := uuid.New().String()
expectedType := uuid.New().String()
expectedErrored := true
expectedErr := errors.New(uuid.New().String())
ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, originalTs)
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, expectedRoute)

Expand All @@ -53,7 +55,7 @@ func TestReportTimingFromCtx(t *testing.T) {
},
)

ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErrored)
ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErr)
})

t.Run("test-tags", func(t *testing.T) {
Expand All @@ -64,7 +66,7 @@ func TestReportTimingFromCtx(t *testing.T) {
originalTs := time.Now().UnixNano()
expectedRoute := uuid.New().String()
expectedType := uuid.New().String()
expectedErrored := false
var expectedErr error
ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, originalTs)
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, expectedRoute)
ctx = pcontext.AddToPropagateCtx(ctx, constants.MetricTagsKey, map[string]string{
Expand All @@ -76,11 +78,12 @@ func TestReportTimingFromCtx(t *testing.T) {
"status": "ok",
"type": expectedType,
"key": "value",
"code": "",
}

mockMetricsReporter.EXPECT().ReportSummary(ResponseTime, expectedTags, gomock.Any())

ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErrored)
ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErr)
})

t.Run("test-tags-not-correct-type", func(t *testing.T) {
Expand All @@ -91,7 +94,7 @@ func TestReportTimingFromCtx(t *testing.T) {
originalTs := time.Now().UnixNano()
expectedRoute := uuid.New().String()
expectedType := uuid.New().String()
expectedErrored := false
var expectedErr error
ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, originalTs)
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, expectedRoute)
ctx = pcontext.AddToPropagateCtx(ctx, constants.MetricTagsKey, "not-map")
Expand All @@ -100,11 +103,57 @@ func TestReportTimingFromCtx(t *testing.T) {
"route": expectedRoute,
"status": "ok",
"type": expectedType,
"code": "",
}

mockMetricsReporter.EXPECT().ReportSummary(ResponseTime, expectedTags, gomock.Any())

ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErrored)
ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErr)
})

t.Run("test-failed-route-with-pitaya-error", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMetricsReporter := mocks.NewMockReporter(ctrl)

originalTs := time.Now().UnixNano()
expectedRoute := uuid.New().String()
expectedType := uuid.New().String()
code := "GAME-404"
expectedErr := e.NewError(errors.New("error"), code)
ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, originalTs)
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, expectedRoute)

mockMetricsReporter.EXPECT().ReportSummary(ResponseTime, map[string]string{
"route": expectedRoute,
"status": "failed",
"type": expectedType,
"code": code,
}, gomock.Any())

ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErr)
})

t.Run("test-failed-route", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMetricsReporter := mocks.NewMockReporter(ctrl)

originalTs := time.Now().UnixNano()
expectedRoute := uuid.New().String()
expectedType := uuid.New().String()
expectedErr := errors.New("error")
ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, originalTs)
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, expectedRoute)

mockMetricsReporter.EXPECT().ReportSummary(ResponseTime, map[string]string{
"route": expectedRoute,
"status": "failed",
"type": expectedType,
"code": e.ErrUnknownCode,
}, gomock.Any())

ReportTimingFromCtx(ctx, []Reporter{mockMetricsReporter}, expectedType, expectedErr)
})
}

Expand Down
2 changes: 1 addition & 1 deletion service/handler.go
Expand Up @@ -318,7 +318,7 @@ func (h *HandlerService) localProcess(ctx context.Context, a *agent.Agent, route
a.Session.ResponseMID(ctx, mid, ret)
}
} else {
metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, false)
metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, nil)
tracing.FinishSpan(ctx, err)
}
}
Expand Down

0 comments on commit efde726

Please sign in to comment.