Skip to content

Commit

Permalink
proto: Increase Session deadlines
Browse files Browse the repository at this point in the history
  • Loading branch information
lukechampine committed Feb 14, 2020
1 parent 98a826d commit 21dacc1
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions renter/proto/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Session struct {
conn net.Conn
appendRoots []crypto.Hash

latency time.Duration
readDeadline time.Duration
writeDeadline time.Duration

Expand All @@ -64,18 +65,31 @@ func (s *Session) HostKey() hostdb.HostPublicKey { return s.host.PublicKey }
// Revision returns the most recent revision of the locked contract.
func (s *Session) Revision() ContractRevision { return s.rev }

// SetReadDeadline sets the per-byte deadline for the Read SectorRoots RPCs. For
// example, to time out after 1 minute when downloading a sector, set the
// per-byte deadline to time.Minute / renterhost.SectorSize.
func (s *Session) SetReadDeadline(d time.Duration) { s.readDeadline = d }
// SetLatency sets the latency deadline for RPCs.
func (s *Session) SetLatency(d time.Duration) { s.latency = d }

// SetWriteDeadline sets the per-byte deadline for the Write RPC. For example,
// to time out after 1 minute when uploading a sector, set the per-byte deadline
// SetReadDeadline sets the per-byte read deadline for RPCs. For example, to
// time out after 1 minute when downloading a sector, set the per-byte deadline
// to time.Minute / renterhost.SectorSize.
func (s *Session) SetReadDeadline(d time.Duration) { s.readDeadline = d }

// SetWriteDeadline sets the per-byte write deadline for RPCs. For example, to
// time out after 1 minute when uploading a sector, set the per-byte deadline to
// time.Minute / renterhost.SectorSize.
func (s *Session) SetWriteDeadline(d time.Duration) { s.writeDeadline = d }

func (s *Session) extendDeadline(d time.Duration) {
_ = s.conn.SetDeadline(time.Now().Add(d))
_ = s.conn.SetDeadline(time.Now().Add(s.latency + d))
}

func (s *Session) extendBandwidthDeadline(up, down uint64) {
if up < renterhost.MinMessageSize {
up = renterhost.MinMessageSize
}
if down < renterhost.MinMessageSize {
down = renterhost.MinMessageSize
}
s.extendDeadline(s.writeDeadline*time.Duration(up) + s.readDeadline*time.Duration(down))
}

// call is a helper method that writes a request and then reads a response.
Expand All @@ -98,7 +112,7 @@ func (s *Session) Lock(id types.FileContractID, key ed25519.PrivateKey) (err err
Signature: s.sess.SignChallenge(key),
Timeout: 10e3, // 10 seconds
}
s.extendDeadline(time.Duration(req.Timeout+5e3) * time.Millisecond)
s.extendDeadline(time.Duration(req.Timeout) * time.Millisecond)
var resp renterhost.RPCLockResponse
if err := s.call(renterhost.RPCLockID, req, &resp); err != nil {
return err
Expand Down Expand Up @@ -135,7 +149,7 @@ func (s *Session) Unlock() (err error) {
if s.key == nil {
return errors.New("no contract locked")
}
s.extendDeadline(10 * time.Second)
s.extendBandwidthDeadline(renterhost.MinMessageSize, renterhost.MinMessageSize)
if err := s.sess.WriteRequest(renterhost.RPCUnlockID, nil); err != nil {
return err
}
Expand All @@ -147,7 +161,7 @@ func (s *Session) Unlock() (err error) {
// Settings calls the Settings RPC, returning the host's reported settings.
func (s *Session) Settings() (_ hostdb.HostSettings, err error) {
defer wrapErr(&err, "Settings")
s.extendDeadline(10 * time.Second)
s.extendBandwidthDeadline(renterhost.MinMessageSize, renterhost.MinMessageSize)
var resp renterhost.RPCSettingsResponse
if err := s.call(renterhost.RPCSettingsID, nil, &resp); err != nil {
return hostdb.HostSettings{}, err
Expand All @@ -169,11 +183,11 @@ func (s *Session) SectorRoots(offset, n int) (_ []crypto.Hash, err error) {

// calculate price
proofHashes := merkle.ProofSize(s.rev.NumSectors(), offset, offset+n)
bandwidth := (proofHashes + n) * crypto.HashSize
if bandwidth < renterhost.MinMessageSize {
bandwidth = renterhost.MinMessageSize
downloadBandwidth := uint64(proofHashes+n) * crypto.HashSize
if downloadBandwidth < renterhost.MinMessageSize {
downloadBandwidth = renterhost.MinMessageSize
}
bandwidthPrice := s.host.DownloadBandwidthPrice.Mul64(uint64(bandwidth))
bandwidthPrice := s.host.DownloadBandwidthPrice.Mul64(downloadBandwidth)
price := s.host.BaseRPCPrice.Add(bandwidthPrice)
if s.rev.RenterFunds().Cmp(price) < 0 {
return nil, errors.New("contract has insufficient funds to support sector roots download")
Expand All @@ -184,8 +198,7 @@ func (s *Session) SectorRoots(offset, n int) (_ []crypto.Hash, err error) {
rev.NewRevisionNumber++
newValid, newMissed := updateRevisionOutputs(&rev, price, types.ZeroCurrency)

uploadBandwidth := 4096 // estimate
s.extendDeadline(time.Second + s.writeDeadline*time.Duration(uploadBandwidth) + s.readDeadline*time.Duration(bandwidth))
s.extendBandwidthDeadline(renterhost.MinMessageSize, downloadBandwidth)
req := &renterhost.RPCSectorRootsRequest{
RootOffset: uint64(offset),
NumRoots: uint64(n),
Expand Down Expand Up @@ -274,7 +287,7 @@ func (s *Session) Read(w io.Writer, sections []renterhost.RPCReadRequestSection)
// send request
uploadBandwidth := 4096 + 4096*uint64(len(sections))
downloadBandwidth := bandwidth + 4096*uint64(len(sections))
s.extendDeadline(time.Second + s.writeDeadline*time.Duration(uploadBandwidth) + s.readDeadline*time.Duration(downloadBandwidth))
s.extendBandwidthDeadline(uploadBandwidth, downloadBandwidth)
req := &renterhost.RPCReadRequest{
Sections: sections,
MerkleProof: true,
Expand Down Expand Up @@ -451,7 +464,7 @@ func (s *Session) Write(actions []renterhost.RPCWriteAction) (err error) {

// send request
uploadBandwidth += 4096 * uint64(len(actions))
s.extendDeadline(time.Second + s.writeDeadline*time.Duration(uploadBandwidth) + s.readDeadline*time.Duration(downloadBandwidth))
s.extendBandwidthDeadline(uploadBandwidth, downloadBandwidth)
req := &renterhost.RPCWriteRequest{
Actions: actions,
MerkleProof: true,
Expand Down Expand Up @@ -613,10 +626,12 @@ func NewUnlockedSession(hostIP modules.NetAddress, hostKey hostdb.HostPublicKey,

// same as above, but without error wrapping, since we call it from NewSession too.
func newUnlockedSession(hostIP modules.NetAddress, hostKey hostdb.HostPublicKey, currentHeight types.BlockHeight) (_ *Session, err error) {
start := time.Now()
conn, err := net.Dial("tcp", string(hostIP))
if err != nil {
return nil, err
}
latency := time.Since(start)
conn.SetDeadline(time.Now().Add(60 * time.Second))
s, err := renterhost.NewRenterSession(conn, hostKey)
if err != nil {
Expand All @@ -630,8 +645,10 @@ func newUnlockedSession(hostIP modules.NetAddress, hostKey hostdb.HostPublicKey,
host: hostdb.ScannedHost{
PublicKey: hostKey,
},
readDeadline: 20 * time.Microsecond, // 0.40 Mbps
writeDeadline: 50 * time.Microsecond, // 0.16 Mbps
// extremely generous default deadlines
latency: time.Second + latency*3,
readDeadline: time.Millisecond,
writeDeadline: time.Millisecond,
}, nil
}

Expand Down

0 comments on commit 21dacc1

Please sign in to comment.