Skip to content

Commit

Permalink
private/server: collect drpc stats and export them
Browse files Browse the repository at this point in the history
the server type now implements the monkit.StatSource
interface so that it can be chained into the monkit
stat collection system.

the api command is changed to include the stats.

Change-Id: I01c5378512901e23ad6bf128c647db657a84987d
  • Loading branch information
zeebo committed Mar 14, 2024
1 parent f0f7005 commit c8d6de6
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cmd/satellite/api.go
Expand Up @@ -141,6 +141,8 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher on satellite api", zap.Error(err))
}

monkit.Package().Chain(peer.Server)

err = metabaseDB.CheckVersion(ctx)
if err != nil {
log.Error("Failed metabase database version check.", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -60,7 +60,7 @@ require (
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v3 v3.0.1
storj.io/common v0.0.0-20240312163747-de28b7045716
storj.io/drpc v0.0.33
storj.io/drpc v0.0.34
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892
storj.io/monkit-jaeger v0.0.0-20240221095020-52b0792fa6cd
storj.io/uplink v1.12.3-0.20240227083244-7974a2e1a6c2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -889,8 +889,8 @@ storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5
storj.io/common v0.0.0-20240312163747-de28b7045716 h1:V3jSwIiO1O8KtihdhC4vNhtr0BYUKNkJKFI6fvdPKiA=
storj.io/common v0.0.0-20240312163747-de28b7045716/go.mod h1:MFl009RHY4tIqySVNy/6EmgRw2q60d26h9N/nb7JxGU=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
storj.io/drpc v0.0.33 h1:yCGZ26r66ZdMP0IcTYsj7WDAUIIjzXk6DJhbhvt9FHI=
storj.io/drpc v0.0.33/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4=
storj.io/drpc v0.0.34 h1:q9zlQKfJ5A7x8NQNFk8x7eKUF78FMhmAbZLnFK+og7I=
storj.io/drpc v0.0.34/go.mod h1:Y9LZaa8esL1PW2IDMqJE7CFSNq7d5bQ3RI7mGPtmKMg=
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892 h1:IVzNtTR1VT+QNALCNLX/Z+0hzGo/Y2XI7umgG5PFwOk=
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892/go.mod h1:S6p41RzIBKoeGAdrziksWkiijnZXql9YcNsc23t0u+8=
storj.io/infectious v0.0.2 h1:rGIdDC/6gNYAStsxsZU79D/MqFjNyJc1tsyyj9sTl7Q=
Expand Down
36 changes: 35 additions & 1 deletion private/server/server.go
Expand Up @@ -11,11 +11,13 @@ import (
"net/http"
"os"
"runtime"
"sort"
"sync"
"syscall"
"time"

"github.com/jtolio/noiseconn"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -33,6 +35,7 @@ import (
"storj.io/drpc/drpcmigrate"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"storj.io/drpc/drpcstats"
jaeger "storj.io/monkit-jaeger"
"storj.io/storj/private/server/debounce"
)
Expand Down Expand Up @@ -189,6 +192,36 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, config Config) (_ *Server
return server, nil
}

// Stats implements monkit.StatSource and outputs statistics about drpc bytes read
// and written per rpc.
func (p *Server) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
merge := func(out, in map[string]drpcstats.Stats) {
for k, vi := range in {
vo := out[k]
vo.Read += vi.Read
vo.Written += vi.Written
out[k] = vo
}
}

stats := make(map[string]drpcstats.Stats)
merge(stats, p.publicEndpointsReplaySafe.drpc.Stats())
merge(stats, p.publicEndpointsAll.drpc.Stats())
merge(stats, p.privateEndpoints.drpc.Stats())

rpcs := make([]string, 0, len(stats))
for k := range stats {
rpcs = append(rpcs, k)
}
sort.Strings(rpcs)

for _, rpc := range rpcs {
v := stats[rpc]
cb(monkit.NewSeriesKey("drpc_bytes_transmitted").WithTag("rpc", rpc), "read", float64(v.Read))
cb(monkit.NewSeriesKey("drpc_bytes_transmitted").WithTag("rpc", rpc), "written", float64(v.Written))
}
}

// Identity returns the server's identity.
func (p *Server) Identity() *identity.FullIdentity { return p.tlsOptions.Ident }

Expand Down Expand Up @@ -470,7 +503,8 @@ func newEndpointCollection() *endpointCollection {
jaeger.RemoteTraceHandler),
),
drpcserver.Options{
Manager: rpc.NewDefaultManagerOptions(),
Manager: rpc.NewDefaultManagerOptions(),
CollectStats: true,
},
),
}
Expand Down
21 changes: 21 additions & 0 deletions private/server/server_test.go
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"testing"

"github.com/spacemonkeygo/monkit/v3"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -211,3 +212,23 @@ func TestHybridConnector_TCPOnly(t *testing.T) {
require.NoError(t, conn.Close())
})
}

func TestServer_Stats(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 0,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]

require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, sat, "test"))

count := 0
sat.API.Server.Stats(func(key monkit.SeriesKey, field string, val float64) {
t.Log(key, field, val)
count++
})

require.Equal(t, count, 2)
})
}
2 changes: 1 addition & 1 deletion testsuite/storjscan/go.mod
Expand Up @@ -161,7 +161,7 @@ require (
gopkg.in/urfave/cli.v1 v1.20.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
storj.io/drpc v0.0.33 // indirect
storj.io/drpc v0.0.34 // indirect
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892 // indirect
storj.io/infectious v0.0.2 // indirect
storj.io/monkit-jaeger v0.0.0-20240221095020-52b0792fa6cd // indirect
Expand Down
4 changes: 2 additions & 2 deletions testsuite/storjscan/go.sum
Expand Up @@ -1212,8 +1212,8 @@ storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5
storj.io/common v0.0.0-20240312163747-de28b7045716 h1:V3jSwIiO1O8KtihdhC4vNhtr0BYUKNkJKFI6fvdPKiA=
storj.io/common v0.0.0-20240312163747-de28b7045716/go.mod h1:MFl009RHY4tIqySVNy/6EmgRw2q60d26h9N/nb7JxGU=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
storj.io/drpc v0.0.33 h1:yCGZ26r66ZdMP0IcTYsj7WDAUIIjzXk6DJhbhvt9FHI=
storj.io/drpc v0.0.33/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4=
storj.io/drpc v0.0.34 h1:q9zlQKfJ5A7x8NQNFk8x7eKUF78FMhmAbZLnFK+og7I=
storj.io/drpc v0.0.34/go.mod h1:Y9LZaa8esL1PW2IDMqJE7CFSNq7d5bQ3RI7mGPtmKMg=
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892 h1:IVzNtTR1VT+QNALCNLX/Z+0hzGo/Y2XI7umgG5PFwOk=
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892/go.mod h1:S6p41RzIBKoeGAdrziksWkiijnZXql9YcNsc23t0u+8=
storj.io/infectious v0.0.2 h1:rGIdDC/6gNYAStsxsZU79D/MqFjNyJc1tsyyj9sTl7Q=
Expand Down

0 comments on commit c8d6de6

Please sign in to comment.