Skip to content

Commit

Permalink
peer: Add machinery to track the state and validity of remote pongs
Browse files Browse the repository at this point in the history
This commit refactors some of the bookkeeping around the ping logic
inside of the Brontide. If the pong response is noncompliant with
the spec or if it times out, we disconnect from the peer.
  • Loading branch information
ProofOfKeags committed Oct 19, 2023
1 parent 1eab782 commit 99226e3
Show file tree
Hide file tree
Showing 3 changed files with 437 additions and 89 deletions.
172 changes: 83 additions & 89 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"container/list"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,6 +51,12 @@ const (
// pingInterval is the interval at which ping messages are sent.
pingInterval = 1 * time.Minute

// pingTimeout is the amount of time we will wait for a pong response
// before considering the peer to be unresponsive.
//
// This MUST be a smaller value than the pingInterval.
pingTimeout = 30 * time.Second

// idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute

Expand Down Expand Up @@ -379,15 +386,7 @@ type Brontide struct {
bytesReceived uint64
bytesSent uint64

// pingTime is a rough estimate of the RTT (round-trip-time) between us
// and the connected peer. This time is expressed in microseconds.
// To be used atomically.
// TODO(roasbeef): also use a WMA or EMA?
pingTime int64

// pingLastSend is the Unix time expressed in nanoseconds when we sent
// our last ping message. To be used atomically.
pingLastSend int64
pingManager *PingManager

// lastPingPayload stores an unsafe pointer wrapped as an atomic
// variable which points to the last payload the remote party sent us
Expand Down Expand Up @@ -525,6 +524,66 @@ func NewBrontide(cfg Config) *Brontide {
log: build.NewPrefixLog(logPrefix, peerLog),
}

var (
lastBlockHeader *wire.BlockHeader
lastSerializedBlockHeader [wire.MaxBlockHeaderPayload]byte
)
newPingPayload := func() []byte {
// We query the BestBlockHeader from our BestBlockView each time
// this is called, and update our serialized block header if
// they differ. Over time, we'll use this to disseminate the
// latest block header between all our peers, which can later be
// used to cross-check our own view of the network to mitigate
// various types of eclipse attacks.
header, err := p.cfg.BestBlockView.BestBlockHeader()
if err != nil && header == lastBlockHeader {
return lastSerializedBlockHeader[:]
}

buf := bytes.NewBuffer(lastSerializedBlockHeader[0:0])
err = header.Serialize(buf)
if err == nil {
lastBlockHeader = header
} else {
p.log.Warn("unable to serialize current block" +
"header for ping payload generation." +
"This should be impossible and means" +
"there is an implementation bug.")
}

return lastSerializedBlockHeader[:]
}

// TODO(roasbeef): make dynamic in order to
// create fake cover traffic
// NOTE(proofofkeags): this was changed to be
// dynamic to allow better pong identification,
// however, more thought is needed to make this
// actually usable as a traffic decoy
randPongSize := func() uint16 {
return uint16(
// We don't need cryptographic randomness here.
/* #nosec */
rand.Intn(lnwire.MaxPongBytes + 1),
)
}

p.pingManager = NewPingManager(&PingManagerConfig{
NewPingPayload: newPingPayload,
NewPongSize: randPongSize,
IntervalDuration: pingInterval,
TimeoutDuration: pingTimeout,
SendPing: func(ping *lnwire.Ping) {
p.queueMsg(ping, nil)
},
OnPongFailure: func(err error) {
eStr := "pong response failure for %s: %v " +
"-- disconnecting"
p.log.Warnf(eStr, p, err)
p.Disconnect(fmt.Errorf(eStr, p, err))
},
})

return p
}

Expand Down Expand Up @@ -644,40 +703,16 @@ func (p *Brontide) Start() error {

p.startTime = time.Now()

p.wg.Add(5)
err = p.pingManager.Start()
if err != nil {
return fmt.Errorf("could not start ping manager %w", err)
}

p.wg.Add(4)
go p.queueHandler()
go p.writeHandler()
go p.readHandler()
go p.channelManager()

var (
lastBlockHeader *wire.BlockHeader
lastSerializedBlockHeader [wire.MaxBlockHeaderPayload]byte
)
newPingPayload := func() []byte {
// We query the BestBlockHeader from our BestBlockView each time
// this is called, and update our serialized block header if
// they differ. Over time, we'll use this to disseminate the
// latest block header between all our peers, which can later be
// used to cross-check our own view of the network to mitigate
// various types of eclipse attacks.
header, err := p.cfg.BestBlockView.BestBlockHeader()
if err == nil && header != lastBlockHeader {
buf := bytes.NewBuffer(lastSerializedBlockHeader[0:0])
err := header.Serialize(buf)
if err == nil {
lastBlockHeader = header
} else {
p.log.Warn("unable to serialize current block" +
"header for ping payload generation." +
"This should be impossible and means" +
"there is an implementation bug.")
}
}

return lastSerializedBlockHeader[:]
}
go p.pingHandler(newPingPayload)
go p.readHandler()

// Signal to any external processes that the peer is now active.
close(p.activeSignal)
Expand Down Expand Up @@ -1159,6 +1194,11 @@ func (p *Brontide) Disconnect(reason error) {
p.cfg.Conn.Close()

close(p.quit)

if err := p.pingManager.Stop(); err != nil {
p.log.Errorf("couldn't stop pingManager during disconnect: %v",
err)
}
}

// String returns the string representation of this peer.
Expand Down Expand Up @@ -1606,12 +1646,8 @@ out:
switch msg := nextMsg.(type) {
case *lnwire.Pong:
// When we receive a Pong message in response to our
// last ping message, we'll use the time in which we
// sent the ping message to measure a rough estimate of
// round trip time.
pingSendTime := atomic.LoadInt64(&p.pingLastSend)
delay := (time.Now().UnixNano() - pingSendTime) / 1000
atomic.StoreInt64(&p.pingTime, delay)
// last ping message, we send it to the pingManager
p.pingManager.ReceivedPong(msg)

case *lnwire.Ping:
// First, we'll store their latest ping payload within
Expand Down Expand Up @@ -2137,17 +2173,6 @@ out:
for {
select {
case outMsg := <-p.sendQueue:
// If we're about to send a ping message, then log the
// exact time in which we send the message so we can
// use the delay as a rough estimate of latency to the
// remote peer.
if _, ok := outMsg.msg.(*lnwire.Ping); ok {
// TODO(roasbeef): do this before the write?
// possibly account for processing within func?
now := time.Now().UnixNano()
atomic.StoreInt64(&p.pingLastSend, now)
}

// Record the time at which we first attempt to send the
// message.
startTime := time.Now()
Expand Down Expand Up @@ -2280,40 +2305,9 @@ func (p *Brontide) queueHandler() {
}
}

// pingHandler is responsible for periodically sending ping messages to the
// remote peer in order to keep the connection alive and/or determine if the
// connection is still active.
//
// NOTE: This method MUST be run as a goroutine.
func (p *Brontide) pingHandler(newPayload func() []byte) {
defer p.wg.Done()

pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()

// TODO(roasbeef): make dynamic in order to create fake cover traffic
const numPongBytes = 16

out:
for {
select {
case <-pingTicker.C:

pingMsg := &lnwire.Ping{
NumPongBytes: numPongBytes,
PaddingBytes: newPayload(),
}

p.queueMsg(pingMsg, nil)
case <-p.quit:
break out
}
}
}

// PingTime returns the estimated ping time to the peer in microseconds.
func (p *Brontide) PingTime() int64 {
return atomic.LoadInt64(&p.pingTime)
return p.pingManager.GetPingTimeMicroSeconds()
}

// queueMsg adds the lnwire.Message to the back of the high priority send queue.
Expand Down

0 comments on commit 99226e3

Please sign in to comment.