From 65c0993825b23c658e43d5fbe2376e3970788a81 Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Wed, 25 Jul 2018 13:13:36 +1000 Subject: [PATCH 1/5] add opencensus stat collection --- metrics.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ server.go | 42 +++++++++++++++++++++++++++++++------- 2 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 metrics.go diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..854a77b --- /dev/null +++ b/metrics.go @@ -0,0 +1,59 @@ +package rpc + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + // taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go) + latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) + + bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) +) + +// opencensus keys, equivalent to prometheus tags +var ( + // ServiceKey is a metrics tag for which RPC service was hit. + ServiceKey = makeKey("service") + // MethodKey is a metrics tag for which RPC method was hit. + MethodKey = makeKey("method") +) + +// opencensus metrics +var ( + // RequestCountMetric is the number times a RPC request has been made. + RequestCountMetric = stats.Int64("libp2p_gorpc/request_count", "Number of requests", stats.UnitDimensionless) + // RequestLatencyMetric is how long a RPC request took to complete. + RequestLatencyMetric = stats.Float64("libp2p_gorpc/request_latency", "Latency of RPC request", stats.UnitMilliseconds) +) + +// opencensus views, which is just the aggregation of the metrics +var ( + RequestCountView = &view.View{ + Measure: RequestCountMetric, + TagKeys: []tag.Key{ServiceKey, MethodKey}, + Aggregation: view.Sum(), + } + + RequestLatencyView = &view.View{ + Name: "libp2p_gorpc/request_latency", + Measure: RequestLatencyMetric, + TagKeys: []tag.Key{ServiceKey, MethodKey}, + Aggregation: latencyDistribution, + } + + DefaultViews = []*view.View{ + RequestCountView, + RequestLatencyView, + } +) + +func makeKey(name string) tag.Key { + key, err := tag.NewKey(name) + if err != nil { + logger.Fatal(err) + } + return key +} diff --git a/server.go b/server.go index cf1e2f8..28091da 100644 --- a/server.go +++ b/server.go @@ -53,6 +53,7 @@ import ( "log" "reflect" "sync" + "time" "unicode" "unicode/utf8" @@ -61,6 +62,9 @@ import ( inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" ) var logger = logging.Logger("p2p-gorpc") @@ -162,6 +166,20 @@ func (server *Server) handle(s *streamWrap) error { return newServerError(err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx, err = tag.New( + ctx, + tag.Upsert(ServiceKey, svcID.Name), + tag.Upsert(MethodKey, svcID.Method), + ) + if err != nil { + return err + } + + ctxv := reflect.ValueOf(ctx) + // Decode the argument value. argIsValue := false // if true, need to indirect before calling. if mtype.ArgType.Kind() == reflect.Ptr { @@ -180,11 +198,6 @@ func (server *Server) handle(s *streamWrap) error { replyv = reflect.New(mtype.ReplyType.Elem()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ctxv := reflect.ValueOf(ctx) - // This is a connection watchdog. We do not // need to read from this stream anymore. // However we'd like to know if the other side is closed @@ -201,13 +214,16 @@ func (server *Server) handle(s *streamWrap) error { }() // Call service and respond - return service.svcCall(s, mtype, svcID, ctxv, argv, replyv) + return service.svcCall(ctx, s, mtype, svcID, ctxv, argv, replyv) } // svcCall calls the actual method associated -func (s *service) svcCall(sWrap *streamWrap, mtype *methodType, svcID ServiceID, ctxv, argv, replyv reflect.Value) error { +func (s *service) svcCall(ctx context.Context, sWrap *streamWrap, mtype *methodType, svcID ServiceID, ctxv, argv, replyv reflect.Value) error { function := mtype.method.Func + startT := time.Now() + stats.Record(ctx, RequestCountMetric.M(1)) + // Invoke the method, providing a new value for the reply. returnValues := function.Call([]reflect.Value{s.rcvr, ctxv, argv, replyv}) // The return value for the method is an error. @@ -216,6 +232,8 @@ func (s *service) svcCall(sWrap *streamWrap, mtype *methodType, svcID ServiceID, if errInter != nil { errmsg = errInter.(error).Error() } + + stats.Record(ctx, RequestLatencyMetric.M(float64(time.Since(startT))/float64(time.Millisecond))) // TODO(ajl): latency doesn't include response part atm resp := &Response{svcID, errmsg, nonRPCErr} return sendResponse(sWrap, resp, replyv.Interface()) @@ -253,6 +271,11 @@ func (server *Server) Call(call *Call) error { // Use the context value from the call directly ctxv := reflect.ValueOf(call.ctx) + ctx, err := tag.New( + call.ctx, + tag.Upsert(ServiceKey, call.SvcID.Name), + tag.Upsert(MethodKey, call.SvcID.Method), + ) // Decode the argument value. argIsValue := false // if true, need to indirect before calling. @@ -288,6 +311,10 @@ func (server *Server) Call(call *Call) error { // Call service and respond function := mtype.method.Func + + startT := time.Now() + stats.Record(ctx, RequestCountMetric.M(1)) + // Invoke the method, providing a new value for the reply. returnValues := function.Call( []reflect.Value{ @@ -306,6 +333,7 @@ func (server *Server) Call(call *Call) error { if errInter != nil { return errInter.(error) } + stats.Record(ctx, RequestLatencyMetric.M(float64(time.Since(startT))/float64(time.Millisecond))) // TODO(ajl): latency doesn't include response part atm return nil } From 40f90bd931e9fe7da8a0f1e4398d49447391b2ed Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Tue, 25 Sep 2018 14:13:38 +1000 Subject: [PATCH 2/5] move to gxed oc License: MIT Signed-off-by: Adrian Lanzafame --- metrics.go | 6 +++--- package.json | 6 ++++++ server.go | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/metrics.go b/metrics.go index 854a77b..e886ede 100644 --- a/metrics.go +++ b/metrics.go @@ -1,9 +1,9 @@ package rpc import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" + "github.com/gxed/opencensus-go/stats" + "github.com/gxed/opencensus-go/stats/view" + "github.com/gxed/opencensus-go/tag" ) var ( diff --git a/package.json b/package.json index 85c724f..dd4a04a 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,12 @@ "hash": "QmewJ1Zp9Hwz5HcMd7JYjhLXwvEHTL2UBCCz3oLt1E2N5z", "name": "go-multicodec", "version": "0.1.6" + }, + { + "author": "lanzafame", + "hash": "QmTN5ZSRrAPrVNa4qWU1X7fC8xSzx49YYQGPrV9pkdNpn3", + "name": "opencensus-go", + "version": "0.0.1" } ], "gxVersion": "0.10.0", diff --git a/server.go b/server.go index 28091da..371a1d6 100644 --- a/server.go +++ b/server.go @@ -63,8 +63,8 @@ import ( peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" - "go.opencensus.io/stats" - "go.opencensus.io/tag" + "github.com/gxed/opencensus-go/stats" + "github.com/gxed/opencensus-go/tag" ) var logger = logging.Logger("p2p-gorpc") From 0e599804b9c30a5df917759f9fb53d1eb84fda32 Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Tue, 25 Sep 2018 14:13:51 +1000 Subject: [PATCH 3/5] gx publish 1.0.19 License: MIT Signed-off-by: Adrian Lanzafame --- .gx/lastpubver | 2 +- package.json | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index 60d2e71..8afc91b 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.18: QmXyteEWrYHVJFEA8oX9cSfRp6PJ2kiVsmsFqPMi9ue1Ek +1.0.19: QmYxRgNxZRAHsH3qusjjcXjx7WpdCRQ9oFTKkSXbFhYKS9 diff --git a/package.json b/package.json index dd4a04a..e6a4fab 100644 --- a/package.json +++ b/package.json @@ -21,9 +21,9 @@ }, { "author": "lanzafame", - "hash": "QmTN5ZSRrAPrVNa4qWU1X7fC8xSzx49YYQGPrV9pkdNpn3", + "hash": "QmXrnFEnNJqescqF3RhsH7BaSU6jMBUZDehNYZY4bUgJCm", "name": "opencensus-go", - "version": "0.0.1" + "version": "0.0.2" } ], "gxVersion": "0.10.0", @@ -31,6 +31,6 @@ "license": "MIT/BSD", "name": "go-libp2p-gorpc", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.18" + "version": "1.0.19" } From bbef852163ba0cde7fb268944f51c4d5eb2c06ae Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Thu, 27 Sep 2018 12:28:12 +1000 Subject: [PATCH 4/5] gx publish 1.0.20 License: MIT Signed-off-by: Adrian Lanzafame --- .gx/lastpubver | 2 +- package.json | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index 8afc91b..fee1ffa 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.19: QmYxRgNxZRAHsH3qusjjcXjx7WpdCRQ9oFTKkSXbFhYKS9 +1.0.20: Qmeg2jG745TY3prcEmJP6EbzSeYDN2V7i9vf7K1YzjUT9u diff --git a/package.json b/package.json index e6a4fab..2203e11 100644 --- a/package.json +++ b/package.json @@ -21,9 +21,9 @@ }, { "author": "lanzafame", - "hash": "QmXrnFEnNJqescqF3RhsH7BaSU6jMBUZDehNYZY4bUgJCm", + "hash": "Qmb2UZ3Kq3AFy5gAL5vkHjYAkvzsbnXAEZomcWp49eV9hj", "name": "opencensus-go", - "version": "0.0.2" + "version": "0.0.3" } ], "gxVersion": "0.10.0", @@ -31,6 +31,6 @@ "license": "MIT/BSD", "name": "go-libp2p-gorpc", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.19" + "version": "1.0.20" } From 130d14da981f4992e998f269e4f60bea29283569 Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Fri, 28 Sep 2018 10:26:40 +1000 Subject: [PATCH 5/5] gx publish 1.0.21 License: MIT Signed-off-by: Adrian Lanzafame --- .gx/lastpubver | 2 +- package.json | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index fee1ffa..e69bca5 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.20: Qmeg2jG745TY3prcEmJP6EbzSeYDN2V7i9vf7K1YzjUT9u +1.0.21: QmTZDiQmRnCqHQKfr7mcHu8w7YBqmmi2dGXfexrtLiYXxh diff --git a/package.json b/package.json index 2203e11..634cc69 100644 --- a/package.json +++ b/package.json @@ -21,9 +21,9 @@ }, { "author": "lanzafame", - "hash": "Qmb2UZ3Kq3AFy5gAL5vkHjYAkvzsbnXAEZomcWp49eV9hj", + "hash": "QmXVi4YVf4Bn8Smics8FoE3BRAUJCSu7my3PvFx2iCqg6p", "name": "opencensus-go", - "version": "0.0.3" + "version": "0.0.4" } ], "gxVersion": "0.10.0", @@ -31,6 +31,6 @@ "license": "MIT/BSD", "name": "go-libp2p-gorpc", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.20" + "version": "1.0.21" }