diff --git a/.gx/lastpubver b/.gx/lastpubver index 60d2e71..e69bca5 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.18: QmXyteEWrYHVJFEA8oX9cSfRp6PJ2kiVsmsFqPMi9ue1Ek +1.0.21: QmTZDiQmRnCqHQKfr7mcHu8w7YBqmmi2dGXfexrtLiYXxh diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..e886ede --- /dev/null +++ b/metrics.go @@ -0,0 +1,59 @@ +package rpc + +import ( + "github.com/gxed/opencensus-go/stats" + "github.com/gxed/opencensus-go/stats/view" + "github.com/gxed/opencensus-go/tag" +) + +var ( + // taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go) + latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) + + bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) +) + +// opencensus keys, equivalent to prometheus tags +var ( + // ServiceKey is a metrics tag for which RPC service was hit. + ServiceKey = makeKey("service") + // MethodKey is a metrics tag for which RPC method was hit. + MethodKey = makeKey("method") +) + +// opencensus metrics +var ( + // RequestCountMetric is the number times a RPC request has been made. + RequestCountMetric = stats.Int64("libp2p_gorpc/request_count", "Number of requests", stats.UnitDimensionless) + // RequestLatencyMetric is how long a RPC request took to complete. + RequestLatencyMetric = stats.Float64("libp2p_gorpc/request_latency", "Latency of RPC request", stats.UnitMilliseconds) +) + +// opencensus views, which is just the aggregation of the metrics +var ( + RequestCountView = &view.View{ + Measure: RequestCountMetric, + TagKeys: []tag.Key{ServiceKey, MethodKey}, + Aggregation: view.Sum(), + } + + RequestLatencyView = &view.View{ + Name: "libp2p_gorpc/request_latency", + Measure: RequestLatencyMetric, + TagKeys: []tag.Key{ServiceKey, MethodKey}, + Aggregation: latencyDistribution, + } + + DefaultViews = []*view.View{ + RequestCountView, + RequestLatencyView, + } +) + +func makeKey(name string) tag.Key { + key, err := tag.NewKey(name) + if err != nil { + logger.Fatal(err) + } + return key +} diff --git a/package.json b/package.json index 85c724f..634cc69 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,12 @@ "hash": "QmewJ1Zp9Hwz5HcMd7JYjhLXwvEHTL2UBCCz3oLt1E2N5z", "name": "go-multicodec", "version": "0.1.6" + }, + { + "author": "lanzafame", + "hash": "QmXVi4YVf4Bn8Smics8FoE3BRAUJCSu7my3PvFx2iCqg6p", + "name": "opencensus-go", + "version": "0.0.4" } ], "gxVersion": "0.10.0", @@ -25,6 +31,6 @@ "license": "MIT/BSD", "name": "go-libp2p-gorpc", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.18" + "version": "1.0.21" } diff --git a/server.go b/server.go index cf1e2f8..371a1d6 100644 --- a/server.go +++ b/server.go @@ -53,6 +53,7 @@ import ( "log" "reflect" "sync" + "time" "unicode" "unicode/utf8" @@ -61,6 +62,9 @@ import ( inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" + + "github.com/gxed/opencensus-go/stats" + "github.com/gxed/opencensus-go/tag" ) var logger = logging.Logger("p2p-gorpc") @@ -162,6 +166,20 @@ func (server *Server) handle(s *streamWrap) error { return newServerError(err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx, err = tag.New( + ctx, + tag.Upsert(ServiceKey, svcID.Name), + tag.Upsert(MethodKey, svcID.Method), + ) + if err != nil { + return err + } + + ctxv := reflect.ValueOf(ctx) + // Decode the argument value. argIsValue := false // if true, need to indirect before calling. if mtype.ArgType.Kind() == reflect.Ptr { @@ -180,11 +198,6 @@ func (server *Server) handle(s *streamWrap) error { replyv = reflect.New(mtype.ReplyType.Elem()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ctxv := reflect.ValueOf(ctx) - // This is a connection watchdog. We do not // need to read from this stream anymore. // However we'd like to know if the other side is closed @@ -201,13 +214,16 @@ func (server *Server) handle(s *streamWrap) error { }() // Call service and respond - return service.svcCall(s, mtype, svcID, ctxv, argv, replyv) + return service.svcCall(ctx, s, mtype, svcID, ctxv, argv, replyv) } // svcCall calls the actual method associated -func (s *service) svcCall(sWrap *streamWrap, mtype *methodType, svcID ServiceID, ctxv, argv, replyv reflect.Value) error { +func (s *service) svcCall(ctx context.Context, sWrap *streamWrap, mtype *methodType, svcID ServiceID, ctxv, argv, replyv reflect.Value) error { function := mtype.method.Func + startT := time.Now() + stats.Record(ctx, RequestCountMetric.M(1)) + // Invoke the method, providing a new value for the reply. returnValues := function.Call([]reflect.Value{s.rcvr, ctxv, argv, replyv}) // The return value for the method is an error. @@ -216,6 +232,8 @@ func (s *service) svcCall(sWrap *streamWrap, mtype *methodType, svcID ServiceID, if errInter != nil { errmsg = errInter.(error).Error() } + + stats.Record(ctx, RequestLatencyMetric.M(float64(time.Since(startT))/float64(time.Millisecond))) // TODO(ajl): latency doesn't include response part atm resp := &Response{svcID, errmsg, nonRPCErr} return sendResponse(sWrap, resp, replyv.Interface()) @@ -253,6 +271,11 @@ func (server *Server) Call(call *Call) error { // Use the context value from the call directly ctxv := reflect.ValueOf(call.ctx) + ctx, err := tag.New( + call.ctx, + tag.Upsert(ServiceKey, call.SvcID.Name), + tag.Upsert(MethodKey, call.SvcID.Method), + ) // Decode the argument value. argIsValue := false // if true, need to indirect before calling. @@ -288,6 +311,10 @@ func (server *Server) Call(call *Call) error { // Call service and respond function := mtype.method.Func + + startT := time.Now() + stats.Record(ctx, RequestCountMetric.M(1)) + // Invoke the method, providing a new value for the reply. returnValues := function.Call( []reflect.Value{ @@ -306,6 +333,7 @@ func (server *Server) Call(call *Call) error { if errInter != nil { return errInter.(error) } + stats.Record(ctx, RequestLatencyMetric.M(float64(time.Since(startT))/float64(time.Millisecond))) // TODO(ajl): latency doesn't include response part atm return nil }