Skip to content

Commit

Permalink
Merge pull request #60 from lukechampine:stats
Browse files Browse the repository at this point in the history
proto: Add RPCStats and RPCStatsRecorder
  • Loading branch information
Goober the Friendly Robutt committed May 22, 2020
2 parents 26af9d3 + 328b939 commit baae38e
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 19 deletions.
55 changes: 39 additions & 16 deletions renter/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package proto // import "lukechampine.com/us/renter/proto"

import (
"time"

"github.com/pkg/errors"
"gitlab.com/NebulousLabs/Sia/crypto"
"gitlab.com/NebulousLabs/Sia/modules"
Expand All @@ -14,22 +16,43 @@ func wrapErr(err *error, fnName string) {
*err = errors.Wrap(*err, fnName)
}

type (
// A Wallet provides addresses and outputs, and can sign transactions.
Wallet interface {
NewWalletAddress() (types.UnlockHash, error)
SignTransaction(txn *types.Transaction, toSign []crypto.Hash) error
UnspentOutputs(limbo bool) ([]modules.UnspentOutput, error)
UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error)
UnlockConditions(addr types.UnlockHash) (types.UnlockConditions, error)
}
// A TransactionPool can broadcast transactions and estimate transaction
// fees.
TransactionPool interface {
AcceptTransactionSet([]types.Transaction) error
FeeEstimate() (min types.Currency, max types.Currency, err error)
}
)
// A Wallet provides addresses and outputs, and can sign transactions.
type Wallet interface {
NewWalletAddress() (types.UnlockHash, error)
SignTransaction(txn *types.Transaction, toSign []crypto.Hash) error
UnspentOutputs(limbo bool) ([]modules.UnspentOutput, error)
UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error)
UnlockConditions(addr types.UnlockHash) (types.UnlockConditions, error)
}

// A TransactionPool can broadcast transactions and estimate transaction
// fees.
type TransactionPool interface {
AcceptTransactionSet([]types.Transaction) error
FeeEstimate() (min types.Currency, max types.Currency, err error)
}

// RPCStats contains various statistics related to an RPC.
type RPCStats struct {
Host hostdb.HostPublicKey
Contract types.FileContractID // empty if no contract is locked
RPC renterhost.Specifier
// Timestamp is the moment the RPC method was invoked; likewise, Elapsed is
// measured at the moment the RPC method returns. Consequently, these stats
// do *not* enable direct measurement of host throughput. However, stats may
// be compared *across* hosts in order to rank their relative performance.
Timestamp time.Time
Elapsed time.Duration
Err error
Uploaded uint64
Downloaded uint64
Cost types.Currency
}

// A RPCStatsRecorder records RPCStats, as reported by a Session.
type RPCStatsRecorder interface {
RecordRPCStats(stats RPCStats)
}

// A ContractRevision contains the most recent revision to a file contract and
// its signatures.
Expand Down
62 changes: 60 additions & 2 deletions renter/proto/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,33 @@ func wrapResponseErr(err error, readCtx, rejectCtx string) error {
return errors.Wrap(err, readCtx)
}

type statsConn struct {
net.Conn
r, w uint64
}

func (sc *statsConn) Read(p []byte) (int, error) {
n, err := sc.Conn.Read(p)
sc.r += uint64(n)
return n, err
}

func (sc *statsConn) Write(p []byte) (int, error) {
n, err := sc.Conn.Write(p)
sc.w += uint64(n)
return n, err
}

// A Session is an ongoing exchange of RPCs via the renter-host protocol.
type Session struct {
sess *renterhost.Session
conn net.Conn
conn *statsConn
appendRoots []crypto.Hash

latency time.Duration
readDeadline time.Duration
writeDeadline time.Duration
stats RPCStatsRecorder

host hostdb.ScannedHost
height types.BlockHeight
Expand Down Expand Up @@ -102,6 +120,36 @@ func (s *Session) extendBandwidthDeadline(up, down uint64) {
s.extendDeadline(s.writeDeadline*time.Duration(up) + s.readDeadline*time.Duration(down))
}

// SetRPCStatsRecorder sets the RPCStatsRecorder for the Session.
func (s *Session) SetRPCStatsRecorder(stats RPCStatsRecorder) { s.stats = stats }

func (s *Session) collectStats(id renterhost.Specifier, err *error) (record func()) {
if s.stats == nil {
return func() {}
}
stats := RPCStats{
Host: s.host.PublicKey,
Contract: s.rev.Revision.ID(),
RPC: id,
Timestamp: time.Now(),
}
var startFunds types.Currency
if s.rev.IsValid() {
startFunds = s.rev.RenterFunds()
}
oldW, oldR := s.conn.w, s.conn.r
return func() {
stats.Err = *err
stats.Elapsed = time.Since(stats.Timestamp)
stats.Uploaded = s.conn.w - oldW
stats.Downloaded = s.conn.r - oldR
if s.rev.IsValid() && startFunds.Cmp(s.rev.RenterFunds()) > 0 {
stats.Cost = startFunds.Sub(s.rev.RenterFunds())
}
s.stats.RecordRPCStats(stats)
}
}

// call is a helper method that writes a request and then reads a response.
func (s *Session) call(rpcID renterhost.Specifier, req, resp renterhost.ProtocolObject) error {
if err := s.sess.WriteRequest(rpcID, req); err != nil {
Expand All @@ -121,6 +169,7 @@ func (s *Session) call(rpcID renterhost.Specifier, req, resp renterhost.Protocol
// other RPCs may result in errors or panics.
func (s *Session) Lock(id types.FileContractID, key ed25519.PrivateKey) (err error) {
defer wrapErr(&err, "Lock")
defer s.collectStats(renterhost.RPCLockID, &err)()
req := &renterhost.RPCLockRequest{
ContractID: id,
Signature: s.sess.SignChallenge(key),
Expand Down Expand Up @@ -163,6 +212,7 @@ func (s *Session) Lock(id types.FileContractID, key ed25519.PrivateKey) (err err
// automatically unlock any locked contracts when the connection closes.
func (s *Session) Unlock() (err error) {
defer wrapErr(&err, "Unlock")
defer s.collectStats(renterhost.RPCUnlockID, &err)()
if s.key == nil {
return errors.New("no contract locked")
}
Expand All @@ -178,6 +228,7 @@ func (s *Session) Unlock() (err error) {
// Settings calls the Settings RPC, returning the host's reported settings.
func (s *Session) Settings() (_ hostdb.HostSettings, err error) {
defer wrapErr(&err, "Settings")
defer s.collectStats(renterhost.RPCSettingsID, &err)()
s.extendBandwidthDeadline(renterhost.MinMessageSize, renterhost.MinMessageSize)
var resp renterhost.RPCSettingsResponse
if err := s.call(renterhost.RPCSettingsID, nil, &resp); err != nil {
Expand All @@ -192,6 +243,8 @@ func (s *Session) Settings() (_ hostdb.HostSettings, err error) {
// sector Merkle roots of the currently-locked contract.
func (s *Session) SectorRoots(offset, n int) (_ []crypto.Hash, err error) {
defer wrapErr(&err, "SectorRoots")
defer s.collectStats(renterhost.RPCSectorRootsID, &err)()

if offset < 0 || n < 0 || offset+n > s.rev.NumSectors() {
return nil, errors.New("requested range is out-of-bounds")
} else if n == 0 {
Expand Down Expand Up @@ -269,6 +322,8 @@ func (sw *segWriter) Write(p []byte) (int, error) {
// is non-nil. Failure to do so may allow an attacker to inject malicious data.
func (s *Session) Read(w io.Writer, sections []renterhost.RPCReadRequestSection) (err error) {
defer wrapErr(&err, "Read")
defer s.collectStats(renterhost.RPCReadID, &err)()

if len(sections) == 0 {
return nil
}
Expand Down Expand Up @@ -407,6 +462,8 @@ func (s *Session) Read(w io.Writer, sections []renterhost.RPCReadRequestSection)
// always requested.
func (s *Session) Write(actions []renterhost.RPCWriteAction) (err error) {
defer wrapErr(&err, "Write")
defer s.collectStats(renterhost.RPCWriteID, &err)()

if len(actions) == 0 {
return nil
}
Expand Down Expand Up @@ -644,11 +701,12 @@ func NewUnlockedSession(hostIP modules.NetAddress, hostKey hostdb.HostPublicKey,
// same as above, but without error wrapping, since we call it from NewSession too.
func newUnlockedSession(hostIP modules.NetAddress, hostKey hostdb.HostPublicKey, currentHeight types.BlockHeight) (_ *Session, err error) {
start := time.Now()
conn, err := net.DialTimeout("tcp", string(hostIP), 60*time.Second)
tcpConn, err := net.DialTimeout("tcp", string(hostIP), 60*time.Second)
if err != nil {
return nil, err
}
latency := time.Since(start)
conn := &statsConn{Conn: tcpConn}
conn.SetDeadline(time.Now().Add(60 * time.Second))
s, err := renterhost.NewRenterSession(conn, hostKey.Ed25519())
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions renter/proto/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,46 @@ func createTestingPair(tb testing.TB) (*Session, *ghost.Host) {
return s, host
}

type testStatsRecorder struct {
stats []RPCStats
}

func (tsr *testStatsRecorder) RecordRPCStats(stats RPCStats) { tsr.stats = append(tsr.stats, stats) }

func TestSession(t *testing.T) {
renter, host := createTestingPair(t)
defer renter.Close()
defer host.Close()

var tsr testStatsRecorder
renter.SetRPCStatsRecorder(&tsr)

sector := [renterhost.SectorSize]byte{0: 1}
sectorRoot, err := renter.Append(&sector)
if err != nil {
t.Fatal(err)
}
if len(tsr.stats) != 1 {
t.Fatal("no stats collected")
} else if stats := tsr.stats[0]; stats.Host != host.PublicKey() ||
stats.RPC != renterhost.RPCWriteID ||
stats.Uploaded == 0 || stats.Downloaded == 0 {
t.Fatal("bad stats:", stats)
}

roots, err := renter.SectorRoots(0, 1)
if err != nil {
t.Fatal(err)
} else if roots[0] != sectorRoot {
t.Fatal("reported sector root does not match actual sector root")
}
if len(tsr.stats) != 2 {
t.Fatal("no stats collected")
} else if stats := tsr.stats[1]; stats.Host != host.PublicKey() ||
stats.RPC != renterhost.RPCSectorRootsID ||
stats.Uploaded == 0 || stats.Downloaded == 0 {
t.Fatal("bad stats:", stats)
}

var sectorBuf bytes.Buffer
err = renter.Read(&sectorBuf, []renterhost.RPCReadRequestSection{{
Expand All @@ -96,11 +119,25 @@ func TestSession(t *testing.T) {
if !bytes.Equal(sectorBuf.Bytes(), sector[:]) {
t.Fatal("downloaded sector does not match uploaded sector")
}
if len(tsr.stats) != 3 {
t.Fatal("no stats collected")
} else if stats := tsr.stats[2]; stats.Host != host.PublicKey() ||
stats.RPC != renterhost.RPCReadID ||
stats.Uploaded == 0 || stats.Downloaded == 0 {
t.Fatal("bad stats:", stats)
}

err = renter.Unlock()
if err != nil {
t.Fatal(err)
}
if len(tsr.stats) != 4 {
t.Fatal("no stats collected")
} else if stats := tsr.stats[3]; stats.Host != host.PublicKey() ||
stats.RPC != renterhost.RPCUnlockID ||
stats.Uploaded == 0 || stats.Downloaded != 0 {
t.Fatal("bad stats:", stats)
}
}

func TestRenewAndClear(t *testing.T) {
Expand Down
11 changes: 10 additions & 1 deletion renter/renterutil/hostset.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type HostSet struct {
sessions map[hostdb.HostPublicKey]*lockedHost
hkr renter.HostKeyResolver
currentHeight types.BlockHeight
stats proto.RPCStatsRecorder
}

// HasHost returns true if the specified host is in the set.
Expand Down Expand Up @@ -142,6 +143,10 @@ func (set *HostSet) release(host hostdb.HostPublicKey) {
lh.mu.Unlock()
}

// SetRPCStatsRecorder sets the RPCStatsRecorder for all Sessions initiated by
// the HostSet.
func (set *HostSet) SetRPCStatsRecorder(r proto.RPCStatsRecorder) { set.stats = r }

// AddHost adds a host to the set for later use.
func (set *HostSet) AddHost(c renter.Contract) {
lh := new(lockedHost)
Expand Down Expand Up @@ -176,7 +181,11 @@ func (set *HostSet) AddHost(c renter.Contract) {
return errors.Wrap(err, "could not resolve host key")
}
lh.s, err = proto.NewSession(hostIP, c.HostKey, c.ID, c.RenterKey, set.currentHeight)
return err
if err != nil {
return err
}
lh.s.SetRPCStatsRecorder(set.stats)
return nil
}
set.sessions[c.HostKey] = lh
}
Expand Down

0 comments on commit baae38e

Please sign in to comment.