Skip to content

Commit

Permalink
Merge 69ab1ca into 2afe0c5
Browse files Browse the repository at this point in the history
  • Loading branch information
cl1337 committed Jul 19, 2018
2 parents 2afe0c5 + 69ab1ca commit aa924ee
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 30 deletions.
78 changes: 60 additions & 18 deletions runtime/call_metrics.go
Expand Up @@ -24,7 +24,9 @@ import (
"fmt"
"net/http"

"github.com/pkg/errors"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go"
)

const (
Expand Down Expand Up @@ -70,6 +72,13 @@ var knownMetrics = []string{
outboundCallsStatus,
}

var knownTchannelErrors = map[tchannel.SystemErrCode]bool{
tchannel.ErrCodeTimeout: true,
tchannel.ErrCodeBadRequest: true,
tchannel.ErrCodeProtocol: true,
tchannel.ErrCodeCancelled: true,
}

var knownStatusCodes = map[int]bool{
http.StatusContinue: true, // 100
http.StatusSwitchingProtocols: true, // 101
Expand Down Expand Up @@ -132,6 +141,49 @@ var knownStatusCodes = map[int]bool{
http.StatusNetworkAuthenticationRequired: true, // 511
}

type systemErrorMap map[tchannel.SystemErrCode]tally.Counter

func newsystemErrorMap(scope tally.Scope, key string) systemErrorMap {
ret := make(systemErrorMap)
ret[tchannel.ErrCodeInvalid] = scope.Counter(key)
for errorCode := range knownTchannelErrors {
ret[errorCode] = scope.Tagged(map[string]string{
"error": errorCode.MetricsKey(),
}).Counter(key)
}
return ret
}

// IncrErr will increase according error counter
func (sem systemErrorMap) IncrErr(err error, count int64) {
errCause := tchannel.GetSystemErrorCode(errors.Cause(err))
counter, ok := sem[errCause]
if !ok {
counter = sem[0x00]
}
counter.Inc(count)
}

// HTTPStatusMap is statusCode -> according counter map
type HTTPStatusMap map[int]tally.Counter

func newHTTPStatusMap(scope tally.Scope, key string) HTTPStatusMap {
ret := make(HTTPStatusMap)
for statusCode := range knownStatusCodes {
ret[statusCode] = scope.Tagged(map[string]string{
"status": fmt.Sprintf("%d", statusCode),
}).Counter(fmt.Sprintf("%s.%d", key, statusCode))
}
return ret
}

// IncrStatus will increase according status code counter
func (hsm HTTPStatusMap) IncrStatus(statusCode int, count int64) {
if counter, ok := hsm[statusCode]; ok {
counter.Inc(count)
}
}

type inboundMetrics struct {
Recvd tally.Counter // inbound.calls.recvd
}
Expand All @@ -147,14 +199,14 @@ type commonMetrics struct {

type tchannelMetrics struct {
commonMetrics
AppErrors tally.Counter // [inbound|outbound].calls.app-errors
SystemErrors tally.Counter // [inbound|outbound].calls.system-errors
AppErrors tally.Counter // [inbound|outbound].calls.app-errors
SystemErrors systemErrorMap // [inbound|outbound].calls.system-errors*
}

type httpMetrics struct {
commonMetrics
Errors tally.Counter // [inbound|outbound].calls.errors
Status map[int]tally.Counter // [inbound|outbound].calls.status.XXX
Errors tally.Counter // [inbound|outbound].calls.errors
Status HTTPStatusMap // [inbound|outbound].calls.status.XXX
}

// InboundHTTPMetrics ...
Expand Down Expand Up @@ -188,12 +240,7 @@ func NewInboundHTTPMetrics(scope tally.Scope) *InboundHTTPMetrics {
metrics.Latency = scope.Timer(inboundCallsLatency)
metrics.Success = scope.Counter(inboundCallsSuccess)
metrics.Errors = scope.Counter(inboundCallsErrors)
metrics.Status = make(map[int]tally.Counter, len(knownStatusCodes))
for statusCode := range knownStatusCodes {
metrics.Status[statusCode] = scope.Tagged(map[string]string{
"status": fmt.Sprintf("%d", statusCode),
}).Counter(fmt.Sprintf("%s.%d", inboundCallsStatus, statusCode))
}
metrics.Status = newHTTPStatusMap(scope, inboundCallsStatus)
return &metrics
}

Expand All @@ -204,7 +251,7 @@ func NewInboundTChannelMetrics(scope tally.Scope) *InboundTChannelMetrics {
metrics.Latency = scope.Timer(inboundCallsLatency)
metrics.Success = scope.Counter(inboundCallsSuccess)
metrics.AppErrors = scope.Counter(inboundCallsAppErrors)
metrics.SystemErrors = scope.Counter(inboundCallsSystemErrors)
metrics.SystemErrors = newsystemErrorMap(scope, inboundCallsSystemErrors)
return &metrics
}

Expand All @@ -215,12 +262,7 @@ func NewOutboundHTTPMetrics(scope tally.Scope) *OutboundHTTPMetrics {
metrics.Latency = scope.Timer(outboundCallsLatency)
metrics.Success = scope.Counter(outboundCallsSuccess)
metrics.Errors = scope.Counter(outboundCallsErrors)
metrics.Status = make(map[int]tally.Counter, len(knownStatusCodes))
for statusCode := range knownStatusCodes {
metrics.Status[statusCode] = scope.Tagged(map[string]string{
"status": fmt.Sprintf("%d", statusCode),
}).Counter(fmt.Sprintf("%s.%d", outboundCallsStatus, statusCode))
}
metrics.Status = newHTTPStatusMap(scope, outboundCallsStatus)
return &metrics
}

Expand All @@ -231,6 +273,6 @@ func NewOutboundTChannelMetrics(scope tally.Scope) *OutboundTChannelMetrics {
metrics.Latency = scope.Timer(outboundCallsLatency)
metrics.Success = scope.Counter(outboundCallsSuccess)
metrics.AppErrors = scope.Counter(outboundCallsAppErrors)
metrics.SystemErrors = scope.Counter(outboundCallsSystemErrors)
metrics.SystemErrors = newsystemErrorMap(scope, outboundCallsSystemErrors)
return &metrics
}
2 changes: 1 addition & 1 deletion runtime/client_http_response.go
Expand Up @@ -147,7 +147,7 @@ func (res *ClientHTTPResponse) finish() {
zap.Int("UnknownStatusCode", res.StatusCode),
)
} else {
res.req.metrics.Status[res.StatusCode].Inc(1)
res.req.metrics.Status.IncrStatus(res.StatusCode, 1)
}
if !known || res.StatusCode >= 400 && res.StatusCode < 600 {
res.req.metrics.Errors.Inc(1)
Expand Down
1 change: 1 addition & 0 deletions runtime/gateway.go
Expand Up @@ -164,6 +164,7 @@ func CreateGateway(
// Bootstrap func
func (gateway *Gateway) Bootstrap() error {
// start HTTP server
gateway.PerHostScope.Counter("server.bootstrap").Inc(1)
_, err := gateway.localHTTPServer.JustListen()
if err != nil {
gateway.Logger.Error("Error listening on port", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion runtime/server_http_response.go
Expand Up @@ -91,7 +91,7 @@ func (res *ServerHTTPResponse) finish() {
zap.Int("UnknownStatusCode", res.StatusCode),
)
} else {
res.Request.metrics.Status[res.StatusCode].Inc(1)
res.Request.metrics.Status.IncrStatus(res.StatusCode, 1)
}
if !known || res.StatusCode >= 400 && res.StatusCode < 600 {
res.Request.metrics.Errors.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion runtime/tchannel_client.go
Expand Up @@ -332,7 +332,7 @@ func (c *tchannelOutboundCall) finish(err error) {

// emit metrics
if err != nil {
c.metrics.SystemErrors.Inc(1)
c.metrics.SystemErrors.IncrErr(err, 1)
} else if !c.success {
c.metrics.AppErrors.Inc(1)
} else {
Expand Down
13 changes: 11 additions & 2 deletions runtime/tchannel_logger.go
Expand Up @@ -127,11 +127,20 @@ func (l TChannelLogger) WithFields(fields ...tchannel.LogField) tchannel.Logger
// https://github.com/uber/tchannel-node/blob/master/errors.js#L907-L930
func LogErrorWarnTimeout(logger *zap.Logger, err error, msg string) {
if err != nil {
if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded ||
tchannel.GetSystemErrorCode(errors.Cause(err)) == tchannel.ErrCodeTimeout {
if isTimeout(err) {
logger.Warn(msg, zap.Error(err))
} else {
logger.Error(msg, zap.Error(err))
}
}
}

// isTimeout return true if error caused by timeout or context cancel
// used by LogErrorWarnTimeout
func isTimeout(err error) bool {
cause := errors.Cause(err)
return cause == context.Canceled ||
cause == context.DeadlineExceeded ||
tchannel.GetSystemErrorCode(
errors.Cause(err)) == tchannel.ErrCodeTimeout
}
2 changes: 1 addition & 1 deletion runtime/tchannel_server.go
Expand Up @@ -234,7 +234,7 @@ func (c *tchannelInboundCall) finish(err error) {

// emit metrics
if err != nil {
c.endpoint.metrics.SystemErrors.Inc(1)
c.endpoint.metrics.SystemErrors.IncrErr(err, 1)
} else if !c.success {
c.endpoint.metrics.AppErrors.Inc(1)
} else {
Expand Down
2 changes: 1 addition & 1 deletion test/endpoints/bar/bar_metrics_test.go
Expand Up @@ -60,7 +60,7 @@ func TestCallMetrics(t *testing.T) {
},
)

numMetrics := 13
numMetrics := 14
cg := gateway.(*testGateway.ChildProcessGateway)
cg.MetricsWaitGroup.Add(numMetrics)

Expand Down
2 changes: 1 addition & 1 deletion test/endpoints/baz/baz_metrics_test.go
Expand Up @@ -73,7 +73,7 @@ func TestCallMetrics(t *testing.T) {
headers["x-token"] = "token"
headers["x-uuid"] = "uuid"

numMetrics := 13
numMetrics := 14
cg.MetricsWaitGroup.Add(numMetrics)

_, err = gateway.MakeRequest(
Expand Down
2 changes: 1 addition & 1 deletion test/endpoints/tchannel/baz/baz_metrics_test.go
Expand Up @@ -72,7 +72,7 @@ func TestCallMetrics(t *testing.T) {
bazClient.NewSimpleServiceCallHandler(fakeCall),
)

numMetrics := 12
numMetrics := 13
cg.MetricsWaitGroup.Add(numMetrics)

ctx := context.Background()
Expand Down
6 changes: 3 additions & 3 deletions test/health_test.go
Expand Up @@ -112,7 +112,7 @@ func TestHealthMetrics(t *testing.T) {
defer gateway.Close()

cgateway := gateway.(*testGateway.ChildProcessGateway)
numMetrics := 9
numMetrics := 10
cgateway.MetricsWaitGroup.Add(numMetrics)

res, err := gateway.MakeRequest("GET", "/health", nil, nil)
Expand Down Expand Up @@ -211,12 +211,12 @@ func TestRuntimeMetrics(t *testing.T) {
cgateway := gateway.(*testGateway.ChildProcessGateway)

// Expect 9 runtime metrics + 1 logged metric
numMetrics := 10
numMetrics := 11
cgateway.MetricsWaitGroup.Add(numMetrics)
cgateway.MetricsWaitGroup.Wait()

metrics := cgateway.M3Service.GetMetrics()
assert.Equal(t, numMetrics, len(metrics), "expected 10 metrics")
assert.Equal(t, numMetrics, len(metrics), "expected 10TestCallMetrics metrics")
names := []string{
"test-gateway.test.per-worker.runtime.num-cpu",
"test-gateway.test.per-worker.runtime.gomaxprocs",
Expand Down

0 comments on commit aa924ee

Please sign in to comment.