Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lnpeer+brontide: reduce memory footprint using read/write pools for message encode/decode #2474

Merged
merged 9 commits into from
Feb 24, 2019
27 changes: 24 additions & 3 deletions brontide/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,34 @@ func Dial(localPriv *btcec.PrivateKey, netAddr *lnwire.NetAddress,
return b, nil
}

// ReadNextMessage uses the connection in a message-oriented instructing it to
// read the next _full_ message with the brontide stream. This function will
// block until the read succeeds.
// ReadNextMessage uses the connection in a message-oriented manner, instructing
// it to read the next _full_ message with the brontide stream. This function
// will block until the read of the header and body succeeds.
//
// NOTE: This method SHOULD NOT be used in the case that the connection may be
// adversarial and induce long delays. If the caller needs to set read deadlines
// appropriately, it is preferred that they use the split ReadNextHeader and
// ReadNextBody methods so that the deadlines can be set appropriately on each.
func (c *Conn) ReadNextMessage() ([]byte, error) {
return c.noise.ReadMessage(c.conn)
}

// ReadNextHeader uses the connection to read the next header from the brontide
// stream. This function will block until the read of the header succeeds and
// return the packet length (including MAC overhead) that is expected from the
// subsequent call to ReadNextBody.
func (c *Conn) ReadNextHeader() (uint32, error) {
return c.noise.ReadHeader(c.conn)
}

// ReadNextBody uses the connection to read the next message body from the
// brontide stream. This function will block until the read of the body succeeds
// and return the decrypted payload. The provided buffer MUST be the packet
// length returned by the preceding call to ReadNextHeader.
func (c *Conn) ReadNextBody(buf []byte) ([]byte, error) {
return c.noise.ReadBody(c.conn, buf)
}

// Read reads data from the connection. Read can be made to time out and
// return an Error with Timeout() == true after a fixed time limit; see
// SetDeadline and SetReadDeadline.
Expand Down
82 changes: 39 additions & 43 deletions brontide/noise.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ import (
"fmt"
"io"
"math"
"runtime"
"time"

"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/crypto/hkdf"

"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/pool"
)

const (
Expand Down Expand Up @@ -60,14 +57,6 @@ var (
ephemeralGen = func() (*btcec.PrivateKey, error) {
return btcec.NewPrivateKey(btcec.S256())
}

// readBufferPool is a singleton instance of a buffer pool, used to
// conserve memory allocations due to read buffers across the entire
// brontide package.
readBufferPool = pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
)

// TODO(roasbeef): free buffer pool?
Expand Down Expand Up @@ -378,15 +367,6 @@ type Machine struct {
// next ciphertext header from the wire. The header is a 2 byte length
// (of the next ciphertext), followed by a 16 byte MAC.
nextCipherHeader [lengthHeaderSize + macSize]byte

// nextCipherText is a static buffer that we'll use to read in the
// bytes of the next cipher text message. As all messages in the
// protocol MUST be below 65KB plus our macSize, this will be
// sufficient to buffer all messages from the socket when we need to
// read the next one. Having a fixed buffer that's re-used also means
// that we save on allocations as we don't need to create a new one
// each time.
nextCipherText *buffer.Read
}

// NewBrontideMachine creates a new instance of the brontide state-machine. If
Expand Down Expand Up @@ -738,43 +718,59 @@ func (b *Machine) WriteMessage(w io.Writer, p []byte) error {
// ReadMessage attempts to read the next message from the passed io.Reader. In
// the case of an authentication error, a non-nil error is returned.
func (b *Machine) ReadMessage(r io.Reader) ([]byte, error) {
if _, err := io.ReadFull(r, b.nextCipherHeader[:]); err != nil {
pktLen, err := b.ReadHeader(r)
if err != nil {
return nil, err
}

buf := make([]byte, pktLen)
return b.ReadBody(r, buf)
}

// ReadHeader attempts to read the next message header from the passed
// io.Reader. The header contains the length of the next body including
// additional overhead of the MAC. In the case of an authentication error, a
// non-nil error is returned.
//
// NOTE: This method SHOULD NOT be used in the case that the io.Reader may be
// adversarial and induce long delays. If the caller needs to set read deadlines
// appropriately, it is preferred that they use the split ReadHeader and
// ReadBody methods so that the deadlines can be set appropriately on each.
func (b *Machine) ReadHeader(r io.Reader) (uint32, error) {
_, err := io.ReadFull(r, b.nextCipherHeader[:])
if err != nil {
return 0, err
}

// Attempt to decrypt+auth the packet length present in the stream.
pktLenBytes, err := b.recvCipher.Decrypt(
nil, nil, b.nextCipherHeader[:],
)
if err != nil {
return nil, err
return 0, err
}

// If this is the first message being read, take a read buffer from the
// buffer pool. This is delayed until this point to avoid allocating
// read buffers until after the peer has successfully completed the
// handshake, and is ready to begin sending lnwire messages.
if b.nextCipherText == nil {
b.nextCipherText = readBufferPool.Take()
runtime.SetFinalizer(b, freeReadBuffer)
}
// Compute the packet length that we will need to read off the wire.
pktLen := uint32(binary.BigEndian.Uint16(pktLenBytes)) + macSize

return pktLen, nil
}

// ReadBody attempts to ready the next message body from the passed io.Reader.
// The provided buffer MUST be the length indicated by the packet length
// returned by the preceding call to ReadHeader. In the case of an
// authentication eerror, a non-nil error is returned.
func (b *Machine) ReadBody(r io.Reader, buf []byte) ([]byte, error) {
// Next, using the length read from the packet header, read the
// encrypted packet itself.
pktLen := uint32(binary.BigEndian.Uint16(pktLenBytes)) + macSize
if _, err := io.ReadFull(r, b.nextCipherText[:pktLen]); err != nil {
// encrypted packet itself into the buffer allocated by the read
// pool.
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

// Finally, decrypt the message held in the buffer, and return a
// new byte slice containing the plaintext.
// TODO(roasbeef): modify to let pass in slice
return b.recvCipher.Decrypt(nil, nil, b.nextCipherText[:pktLen])
}

// freeReadBuffer returns the Machine's read buffer back to the package wide
// read buffer pool.
//
// NOTE: This method should only be called by a Machine's finalizer.
func freeReadBuffer(b *Machine) {
readBufferPool.Return(b.nextCipherText)
b.nextCipherText = nil
return b.recvCipher.Decrypt(nil, nil, buf)
}
103 changes: 66 additions & 37 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/ticker"
)

Expand All @@ -43,8 +44,12 @@ const (
// idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute

// writeMessageTimeout is the timeout used when writing a message to peer.
writeMessageTimeout = 50 * time.Second
// writeMessageTimeout is the timeout used when writing a message to a peer.
writeMessageTimeout = 10 * time.Second
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved

// readMessageTimeout is the timeout used when reading a message from a
// peer.
readMessageTimeout = 5 * time.Second

// handshakeTimeout is the timeout used when waiting for peer init message.
handshakeTimeout = 15 * time.Second
Expand Down Expand Up @@ -209,11 +214,13 @@ type peer struct {
// TODO(halseth): remove when link failure is properly handled.
failedChannels map[lnwire.ChannelID]struct{}

// writeBuf is a buffer that we'll re-use in order to encode wire
// messages to write out directly on the socket. By re-using this
// buffer, we avoid needing to allocate more memory each time a new
// message is to be sent to a peer.
writeBuf *buffer.Write
// writePool is the task pool to that manages reuse of write buffers.
// Write tasks are submitted to the pool in order to conserve the total
// number of write buffers allocated at any one time, and decouple write
// buffer allocation from the peer life cycle.
writePool *pool.Write

readPool *pool.Read

queueQuit chan struct{}
quit chan struct{}
Expand Down Expand Up @@ -258,7 +265,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,

chanActiveTimeout: chanActiveTimeout,

writeBuf: server.writeBufferPool.Take(),
writePool: server.writePool,
readPool: server.readPool,

queueQuit: make(chan struct{}),
quit: make(chan struct{}),
Expand Down Expand Up @@ -608,11 +616,6 @@ func (p *peer) WaitForDisconnect(ready chan struct{}) {
}

p.wg.Wait()

// Now that we are certain all active goroutines which could have been
// modifying the write buffer have exited, return the buffer to the pool
// to be reused.
p.server.writeBufferPool.Return(p.writeBuf)
}

// Disconnect terminates the connection with the remote peer. Additionally, a
Expand Down Expand Up @@ -644,11 +647,37 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
return nil, fmt.Errorf("brontide.Conn required to read messages")
}

err := noiseConn.SetReadDeadline(time.Time{})
if err != nil {
return nil, err
}

pktLen, err := noiseConn.ReadNextHeader()
if err != nil {
return nil, err
}

// First we'll read the next _full_ message. We do this rather than
// reading incrementally from the stream as the Lightning wire protocol
// is message oriented and allows nodes to pad on additional data to
// the message stream.
rawMsg, err := noiseConn.ReadNextMessage()
var rawMsg []byte
err = p.readPool.Submit(func(buf *buffer.Read) error {
// Before reading the body of the message, set the read timeout
// accordingly to ensure we don't block other readers using the
// pool. We do so only after the task has been scheduled to
// ensure the deadline doesn't expire while the message is in
// the process of being scheduled.
readDeadline := time.Now().Add(readMessageTimeout)
readErr := noiseConn.SetReadDeadline(readDeadline)
if readErr != nil {
return readErr
}

rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen])
return readErr
})

atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg)))
if err != nil {
return nil, err
Expand Down Expand Up @@ -1359,32 +1388,32 @@ func (p *peer) writeMessage(msg lnwire.Message) error {

p.logWireMessage(msg, false)

// We'll re-slice of static write buffer to allow this new message to
// utilize all available space. We also ensure we cap the capacity of
// this new buffer to the static buffer which is sized for the largest
// possible protocol message.
b := bytes.NewBuffer(p.writeBuf[0:0:len(p.writeBuf)])

// With the temp buffer created and sliced properly (length zero, full
// capacity), we'll now encode the message directly into this buffer.
_, err := lnwire.WriteMessage(b, msg, 0)
if err != nil {
return err
}
var n int
err := p.writePool.Submit(func(buf *bytes.Buffer) error {
cfromknecht marked this conversation as resolved.
Show resolved Hide resolved
// Using a buffer allocated by the write pool, encode the
// message directly into the buffer.
_, writeErr := lnwire.WriteMessage(buf, msg, 0)
if writeErr != nil {
return writeErr
}

// Compute and set the write deadline we will impose on the remote peer.
writeDeadline := time.Now().Add(writeMessageTimeout)
err = p.conn.SetWriteDeadline(writeDeadline)
if err != nil {
return err
}
// Ensure the write deadline is set before we attempt to send
// the message.
writeDeadline := time.Now().Add(writeMessageTimeout)
writeErr = p.conn.SetWriteDeadline(writeDeadline)
if writeErr != nil {
return writeErr
}

// Finally, write the message itself in a single swoop.
n, err := p.conn.Write(b.Bytes())
// Finally, write the message itself in a single swoop.
n, writeErr = p.conn.Write(buf.Bytes())
return writeErr
})

// Regardless of the error returned, record how many bytes were written
// to the wire.
atomic.AddUint64(&p.bytesSent, uint64(n))
// Record the number of bytes written on the wire, if any.
cfromknecht marked this conversation as resolved.
Show resolved Hide resolved
if n > 0 {
atomic.AddUint64(&p.bytesSent, uint64(n))
}

return err
}
Expand Down
Loading