From ed8fe4bc82093e122a333855e5191b12cf88753c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:03:39 -0700 Subject: [PATCH 01/10] brontide/noise: add Flush method This commit adds a Flush method to the brontide.Machine, which can write out a buffered message to an io.Writer. This is a preliminary change which will allow the encryption of the plaintext to be done in a distinct method from actually writing the bytes to the wire. --- brontide/noise.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/brontide/noise.go b/brontide/noise.go index 7b6bdb43a06..e6b9045c32c 100644 --- a/brontide/noise.go +++ b/brontide/noise.go @@ -367,6 +367,16 @@ type Machine struct { // next ciphertext header from the wire. The header is a 2 byte length // (of the next ciphertext), followed by a 16 byte MAC. nextCipherHeader [lengthHeaderSize + macSize]byte + + // nextHeaderSend holds a reference to the remaining header bytes to + // write out for a pending message. This allows us to tolerate timeout + // errors that cause partial writes. + nextHeaderSend []byte + + // nextHeaderBody holds a reference to the remaining body bytes to write + // out for a pending message. This allows us to tolerate timeout errors + // that cause partial writes. + nextBodySend []byte } // NewBrontideMachine creates a new instance of the brontide state-machine. If @@ -715,6 +725,81 @@ func (b *Machine) WriteMessage(w io.Writer, p []byte) error { return err } +// Flush attempts to write a message buffered using WriteMessage to the provided +// io.Writer. If no buffered message exists, this will result in a NOP. +// Otherwise, it will continue to write the remaining bytes, picking up where +// the byte stream left off in the event of a partial write. The number of bytes +// returned reflects the number of plaintext bytes in the payload, and does not +// account for the overhead of the header or MACs. +// +// NOTE: It is safe to call this method again iff a timeout error is returned. +func (b *Machine) Flush(w io.Writer) (int, error) { + // First, write out the pending header bytes, if any exist. Any header + // bytes written will not count towards the total amount flushed. + if len(b.nextHeaderSend) > 0 { + // Write any remaining header bytes and shift the slice to point + // to the next segment of unwritten bytes. If an error is + // encountered, we can continue to write the header from where + // we left off on a subsequent call to Flush. + n, err := w.Write(b.nextHeaderSend) + b.nextHeaderSend = b.nextHeaderSend[n:] + if err != nil { + return 0, err + } + } + + // Next, write the pending body bytes, if any exist. Only the number of + // bytes written that correspond to the ciphertext will be included in + // the total bytes written, bytes written as part of the MAC will not be + // counted. + var nn int + if len(b.nextBodySend) > 0 { + // Write out all bytes excluding the mac and shift the body + // slice depending on the number of actual bytes written. + n, err := w.Write(b.nextBodySend) + b.nextBodySend = b.nextBodySend[n:] + + // If we partially or fully wrote any of the body's MAC, we'll + // subtract that contribution from the total amount flushed to + // preserve the abstraction of returning the number of plaintext + // bytes written by the connection. + // + // There are three possible scenarios we must handle to ensure + // the returned value is correct. In the first case, the write + // straddles both payload and MAC bytes, and we must subtract + // the number of MAC bytes written from n. In the second, only + // payload bytes are written, thus we can return n unmodified. + // The final scenario pertains to the case where only MAC bytes + // are written, none of which count towards the total. + // + // |-----------Payload------------|----MAC----| + // Straddle: S---------------------------------E--------0 + // Payload-only: S------------------------E-----------------0 + // MAC-only: S-------E-0 + start, end := n+len(b.nextBodySend), len(b.nextBodySend) + switch { + + // Straddles payload and MAC bytes, subtract number of MAC bytes + // written from the actual number written. + case start > macSize && end <= macSize: + nn = n - (macSize - end) + + // Only payload bytes are written, return n directly. + case start > macSize && end > macSize: + nn = n + + // Only MAC bytes are written, return 0 bytes written. + default: + } + + if err != nil { + return nn, err + } + } + + return nn, nil +} + // ReadMessage attempts to read the next message from the passed io.Reader. In // the case of an authentication error, a non-nil error is returned. func (b *Machine) ReadMessage(r io.Reader) ([]byte, error) { From 73cf352daadfdafff317234cd5881abb86a20fd3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:03:56 -0700 Subject: [PATCH 02/10] brontide/conn: migrate to WriteMessage + Flush This commit modifies WriteMessage to only perform encryption on the passed plaintext, and buffer the ciphertext within the connection object. We then modify internal uses of WriteMessage to follow with a call to Flush, which actually writes the message to the wire. Additionally, since WriteMessage does not actually perform the write itself, the io.Writer argument is removed from the function signature and all call sites. --- brontide/conn.go | 14 +++++++++++--- brontide/noise.go | 40 ++++++++++++++++++++++++---------------- brontide/noise_test.go | 14 ++++++++------ 3 files changed, 43 insertions(+), 25 deletions(-) diff --git a/brontide/conn.go b/brontide/conn.go index 643ff860c97..7c6a16e211a 100644 --- a/brontide/conn.go +++ b/brontide/conn.go @@ -166,7 +166,11 @@ func (c *Conn) Write(b []byte) (n int, err error) { // If the message doesn't require any chunking, then we can go ahead // with a single write. if len(b) <= math.MaxUint16 { - return len(b), c.noise.WriteMessage(c.conn, b) + err = c.noise.WriteMessage(b) + if err != nil { + return 0, err + } + return c.noise.Flush(c.conn) } // If we need to split the message into fragments, then we'll write @@ -185,11 +189,15 @@ func (c *Conn) Write(b []byte) (n int, err error) { // Slice off the next chunk to be written based on our running // counter and next chunk size. chunk := b[bytesWritten : bytesWritten+chunkSize] - if err := c.noise.WriteMessage(c.conn, chunk); err != nil { + if err := c.noise.WriteMessage(chunk); err != nil { return bytesWritten, err } - bytesWritten += len(chunk) + n, err := c.noise.Flush(c.conn) + bytesWritten += n + if err != nil { + return bytesWritten, err + } } return bytesWritten, nil diff --git a/brontide/noise.go b/brontide/noise.go index e6b9045c32c..ef5ecf29d83 100644 --- a/brontide/noise.go +++ b/brontide/noise.go @@ -48,6 +48,10 @@ var ( ErrMaxMessageLengthExceeded = errors.New("the generated payload exceeds " + "the max allowed message length of (2^16)-1") + // ErrMessageNotFlushed signals that the connection cannot accept a new + // message because the prior message has not been fully flushed. + ErrMessageNotFlushed = errors.New("prior message not flushed") + // lightningPrologue is the noise prologue that is used to initialize // the brontide noise handshake. lightningPrologue = []byte("lightning") @@ -692,11 +696,13 @@ func (b *Machine) split() { } } -// WriteMessage writes the next message p to the passed io.Writer. The -// ciphertext of the message is prepended with an encrypt+auth'd length which -// must be used as the AD to the AEAD construction when being decrypted by the -// other side. -func (b *Machine) WriteMessage(w io.Writer, p []byte) error { +// WriteMessage encrypts and buffers the next message p. The ciphertext of the +// message is prepended with an encrypt+auth'd length which must be used as the +// AD to the AEAD construction when being decrypted by the other side. +// +// NOTE: This DOES NOT write the message to the wire, it should be followed by a +// call to Flush to ensure the message is written. +func (b *Machine) WriteMessage(p []byte) error { // The total length of each message payload including the MAC size // payload exceed the largest number encodable within a 16-bit unsigned // integer. @@ -704,6 +710,13 @@ func (b *Machine) WriteMessage(w io.Writer, p []byte) error { return ErrMaxMessageLengthExceeded } + // If a prior message was written but it hasn't been fully flushed, + // return an error as we only support buffering of one message at a + // time. + if len(b.nextHeaderSend) > 0 || len(b.nextBodySend) > 0 { + return ErrMessageNotFlushed + } + // The full length of the packet is only the packet length, and does // NOT include the MAC. fullLength := uint16(len(p)) @@ -711,18 +724,13 @@ func (b *Machine) WriteMessage(w io.Writer, p []byte) error { var pktLen [2]byte binary.BigEndian.PutUint16(pktLen[:], fullLength) - // First, write out the encrypted+MAC'd length prefix for the packet. - cipherLen := b.sendCipher.Encrypt(nil, nil, pktLen[:]) - if _, err := w.Write(cipherLen); err != nil { - return err - } + // First, generate the encrypted+MAC'd length prefix for the packet. + b.nextHeaderSend = b.sendCipher.Encrypt(nil, nil, pktLen[:]) - // Finally, write out the encrypted packet itself. We only write out a - // single packet, as any fragmentation should have taken place at a - // higher level. - cipherText := b.sendCipher.Encrypt(nil, nil, p) - _, err := w.Write(cipherText) - return err + // Finally, generate the encrypted packet itself. + b.nextBodySend = b.sendCipher.Encrypt(nil, nil, p) + + return nil } // Flush attempts to write a message buffered using WriteMessage to the provided diff --git a/brontide/noise_test.go b/brontide/noise_test.go index 1918a64610c..1349e6534e3 100644 --- a/brontide/noise_test.go +++ b/brontide/noise_test.go @@ -220,11 +220,9 @@ func TestMaxPayloadLength(t *testing.T) { // payload length. payloadToReject := make([]byte, math.MaxUint16+1) - var buf bytes.Buffer - // A write of the payload generated above to the state machine should // be rejected as it's over the max payload length. - err := b.WriteMessage(&buf, payloadToReject) + err := b.WriteMessage(payloadToReject) if err != ErrMaxMessageLengthExceeded { t.Fatalf("payload is over the max allowed length, the write " + "should have been rejected") @@ -233,7 +231,7 @@ func TestMaxPayloadLength(t *testing.T) { // Generate another payload which should be accepted as a valid // payload. payloadToAccept := make([]byte, math.MaxUint16-1) - if err := b.WriteMessage(&buf, payloadToAccept); err != nil { + if err := b.WriteMessage(payloadToAccept); err != nil { t.Fatalf("write for payload was rejected, should have been " + "accepted") } @@ -243,7 +241,7 @@ func TestMaxPayloadLength(t *testing.T) { payloadToReject = make([]byte, math.MaxUint16+1) // This payload should be rejected. - err = b.WriteMessage(&buf, payloadToReject) + err = b.WriteMessage(payloadToReject) if err != ErrMaxMessageLengthExceeded { t.Fatalf("payload is over the max allowed length, the write " + "should have been rejected") @@ -502,10 +500,14 @@ func TestBolt0008TestVectors(t *testing.T) { var buf bytes.Buffer for i := 0; i < 1002; i++ { - err = initiator.WriteMessage(&buf, payload) + err = initiator.WriteMessage(payload) if err != nil { t.Fatalf("could not write message %s", payload) } + _, err = initiator.Flush(&buf) + if err != nil { + t.Fatalf("could not flush message: %v", err) + } if val, ok := transportMessageVectors[i]; ok { binaryVal, err := hex.DecodeString(val) if err != nil { From 333caac09c35c9dd5efaca58a5549cb0d5aaf7eb Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:04:08 -0700 Subject: [PATCH 03/10] brontide/noise_test: add timeoutWriter --- brontide/noise_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/brontide/noise_test.go b/brontide/noise_test.go index 1349e6534e3..c0614f50ce0 100644 --- a/brontide/noise_test.go +++ b/brontide/noise_test.go @@ -8,6 +8,7 @@ import ( "net" "sync" "testing" + "testing/iotest" "github.com/btcsuite/btcd/btcec" "github.com/lightningnetwork/lnd/lnwire" @@ -536,3 +537,27 @@ func TestBolt0008TestVectors(t *testing.T) { buf.Reset() } } + +// timeoutWriter wraps an io.Writer and throws an iotest.ErrTimeout after +// writing n bytes. +type timeoutWriter struct { + w io.Writer + n int64 +} + +func NewTimeoutWriter(w io.Writer, n int64) io.Writer { + return &timeoutWriter{w, n} +} + +func (t *timeoutWriter) Write(p []byte) (int, error) { + n := len(p) + if int64(n) > t.n { + n = int(t.n) + } + n, err := t.w.Write(p[:n]) + t.n -= int64(n) + if err == nil && t.n == 0 { + return n, iotest.ErrTimeout + } + return n, err +} From e3728da4782bceb78ec098252f9446e25c44cb8d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:04:24 -0700 Subject: [PATCH 04/10] brontide/noise_test: add TestFlush --- brontide/noise.go | 6 +- brontide/noise_test.go | 149 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) diff --git a/brontide/noise.go b/brontide/noise.go index ef5ecf29d83..7d5f42d4ede 100644 --- a/brontide/noise.go +++ b/brontide/noise.go @@ -31,6 +31,10 @@ const ( // length of a message payload. lengthHeaderSize = 2 + // encHeaderSize is the number of bytes required to hold an encrypted + // header and it's MAC. + encHeaderSize = lengthHeaderSize + macSize + // keyRotationInterval is the number of messages sent on a single // cipher stream before the keys are rotated forwards. keyRotationInterval = 1000 @@ -370,7 +374,7 @@ type Machine struct { // nextCipherHeader is a static buffer that we'll use to read in the // next ciphertext header from the wire. The header is a 2 byte length // (of the next ciphertext), followed by a 16 byte MAC. - nextCipherHeader [lengthHeaderSize + macSize]byte + nextCipherHeader [encHeaderSize]byte // nextHeaderSend holds a reference to the remaining header bytes to // write out for a pending message. This allows us to tolerate timeout diff --git a/brontide/noise_test.go b/brontide/noise_test.go index c0614f50ce0..55ea46594fc 100644 --- a/brontide/noise_test.go +++ b/brontide/noise_test.go @@ -561,3 +561,152 @@ func (t *timeoutWriter) Write(p []byte) (int, error) { } return n, err } + +const payloadSize = 10 + +type flushChunk struct { + errAfter int64 + expN int + expErr error +} + +type flushTest struct { + name string + chunks []flushChunk +} + +var flushTests = []flushTest{ + { + name: "partial header write", + chunks: []flushChunk{ + // Write 18-byte header in two parts, 16 then 2. + { + errAfter: encHeaderSize - 2, + expN: 0, + expErr: iotest.ErrTimeout, + }, + { + errAfter: 2, + expN: 0, + expErr: iotest.ErrTimeout, + }, + // Write payload and MAC in one go. + { + errAfter: -1, + expN: payloadSize, + }, + }, + }, + { + name: "full payload then full mac", + chunks: []flushChunk{ + // Write entire header and entire payload w/o MAC. + { + errAfter: encHeaderSize + payloadSize, + expN: payloadSize, + expErr: iotest.ErrTimeout, + }, + // Write the entire MAC. + { + errAfter: -1, + expN: 0, + }, + }, + }, + { + name: "payload-only, straddle, mac-only", + chunks: []flushChunk{ + // Write header and all but last byte of payload. + { + errAfter: encHeaderSize + payloadSize - 1, + expN: payloadSize - 1, + expErr: iotest.ErrTimeout, + }, + // Write last byte of payload and first byte of MAC. + { + errAfter: 2, + expN: 1, + expErr: iotest.ErrTimeout, + }, + // Write 10 bytes of the MAC. + { + errAfter: 10, + expN: 0, + expErr: iotest.ErrTimeout, + }, + // Write the remaining 5 MAC bytes. + { + errAfter: -1, + expN: 0, + }, + }, + }, +} + +// TestFlush asserts a Machine's ability to handle timeouts during Flush that +// cause partial writes, and that the machine can properly resume writes on +// subsequent calls to Flush. +func TestFlush(t *testing.T) { + // Run each test individually, to assert that they pass in isolation. + for _, test := range flushTests { + t.Run(test.name, func(t *testing.T) { + var ( + w bytes.Buffer + b Machine + ) + b.split() + testFlush(t, test, &b, &w) + }) + } + + // Finally, run the tests serially as if all on one connection. + t.Run("flush serial", func(t *testing.T) { + var ( + w bytes.Buffer + b Machine + ) + b.split() + for _, test := range flushTests { + testFlush(t, test, &b, &w) + } + }) +} + +// testFlush buffers a message on the Machine, then flushes it to the io.Writer +// in chunks. Once complete, a final call to flush is made to assert that Write +// is not called again. +func testFlush(t *testing.T, test flushTest, b *Machine, w io.Writer) { + payload := make([]byte, payloadSize) + if err := b.WriteMessage(payload); err != nil { + t.Fatalf("unable to write message: %v", err) + } + + for _, chunk := range test.chunks { + assertFlush(t, b, w, chunk.errAfter, chunk.expN, chunk.expErr) + } + + // We should always be able to call Flush after a message has been + // successfully written, and it should result in a NOP. + assertFlush(t, b, w, 0, 0, nil) +} + +// assertFlush flushes a chunk to the passed io.Writer. If n >= 0, a +// timeoutWriter will be used the flush should stop with iotest.ErrTimeout after +// n bytes. The method asserts that the returned error matches expErr and that +// the number of bytes written by Flush matches expN. +func assertFlush(t *testing.T, b *Machine, w io.Writer, n int64, expN int, + expErr error) { + + t.Helper() + + if n >= 0 { + w = NewTimeoutWriter(w, n) + } + nn, err := b.Flush(w) + if err != expErr { + t.Fatalf("expected flush err: %v, got: %v", expErr, err) + } + if nn != expN { + t.Fatalf("expected n: %d, got: %d", expN, nn) + } +} From 12ec9992522c07de2cbf98c40b149cf2f0da94a7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:04:37 -0700 Subject: [PATCH 05/10] brontide/noise_test: defer wait group decrement If any of the checks in the goroutine fail, the test will deadlock since the waitgroup is never released. --- brontide/noise_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/brontide/noise_test.go b/brontide/noise_test.go index 55ea46594fc..50de346c391 100644 --- a/brontide/noise_test.go +++ b/brontide/noise_test.go @@ -269,6 +269,8 @@ func TestWriteMessageChunking(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { + defer wg.Done() + bytesWritten, err := localConn.Write(largeMessage) if err != nil { t.Fatalf("unable to write message: %v", err) @@ -280,7 +282,6 @@ func TestWriteMessageChunking(t *testing.T) { t.Fatalf("bytes not fully written!") } - wg.Done() }() // Attempt to read the entirety of the message generated above. From 6bc32871fd3ff6c0b329f47feb580c0a8eb4a636 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:04:53 -0700 Subject: [PATCH 06/10] brontide/conn: expose WriteMessage and Flush on brontide.Conn This commit exposes the WriteMessage and Flush interfaces of the underlying brontide.Machine, such that callers can have greater flexibility in when blocking network operations take place. --- brontide/conn.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/brontide/conn.go b/brontide/conn.go index 7c6a16e211a..0ebed66f89b 100644 --- a/brontide/conn.go +++ b/brontide/conn.go @@ -203,6 +203,29 @@ func (c *Conn) Write(b []byte) (n int, err error) { return bytesWritten, nil } +// WriteMessage encrypts and buffers the next message p for the connection. The +// ciphertext of the message is prepended with an encrypt+auth'd length which +// must be used as the AD to the AEAD construction when being decrypted by the +// other side. +// +// NOTE: This DOES NOT write the message to the wire, it should be followed by a +// call to Flush to ensure the message is written. +func (c *Conn) WriteMessage(b []byte) error { + return c.noise.WriteMessage(b) +} + +// Flush attempts to write a message buffered using WriteMessage to the +// underlying connection. If no buffered message exists, this will result in a +// NOP. Otherwise, it will continue to write the remaining bytes, picking up +// where the byte stream left off in the event of a partial write. The number of +// bytes returned reflects the number of plaintext bytes in the payload, and +// does not account for the overhead of the header or MACs. +// +// NOTE: It is safe to call this method again iff a timeout error is returned. +func (c *Conn) Flush() (int, error) { + return c.noise.Flush(c.conn) +} + // Close closes the connection. Any blocked Read or Write operations will be // unblocked and return errors. // From f8345d38fb04ce45f5a984d0ec8eecc3f56cbb0d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:05:10 -0700 Subject: [PATCH 07/10] peer: only set ping time once As a preliminary step to integrating the separated WriteMessage and Flush calls in the peer, we'll modify the peer to only set a timestamp on Ping messages once. This makes sense for two reasons, 1) if the message has already been partially written, we have already committed to a ping time, and 2) a ciphertext containing the first ping time will already be buffered in the connection, and we will only be attempting to Flush on timeout errors. --- peer.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/peer.go b/peer.go index e227d141ddf..9a5edc057a2 100644 --- a/peer.go +++ b/peer.go @@ -1468,6 +1468,17 @@ out: for { select { case outMsg := <-p.sendQueue: + // If we're about to send a ping message, then log the + // exact time in which we send the message so we can + // use the delay as a rough estimate of latency to the + // remote peer. + if _, ok := outMsg.msg.(*lnwire.Ping); ok { + // TODO(roasbeef): do this before the write? + // possibly account for processing within func? + now := time.Now().UnixNano() + atomic.StoreInt64(&p.pingLastSend, now) + } + // Record the time at which we first attempt to send the // message. startTime := time.Now() @@ -1491,17 +1502,6 @@ out: } } - // If we're about to send a ping message, then log the - // exact time in which we send the message so we can - // use the delay as a rough estimate of latency to the - // remote peer. - if _, ok := outMsg.msg.(*lnwire.Ping); ok { - // TODO(roasbeef): do this before the write? - // possibly account for processing within func? - now := time.Now().UnixNano() - atomic.StoreInt64(&p.pingLastSend, now) - } - // Write out the message to the socket. If a timeout // error is encountered, we will catch this and retry // after backing off in case the remote peer is just From bfbf3015ada1c139cf565d0c0c71dd0e696071d8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:05:26 -0700 Subject: [PATCH 08/10] peer: replace Write with split WriteMessage then Flush methods This commit modifies the way the link writes messages to the wire, by first buffering ciphertexts to the connection using WriteMessage, and then calling Flush separately. Currently, the call to Write tries to do both, which can result in a blocking operation for up to the duration of the write timeout. Splitting these operations permits less blocking in the write pool, since now we only need to use a write worker to serialize and encrypt the plaintext. After the write pool is released, the peer then attempts to flush the message using the appropriate write timeout. If a timeout error occurs, the peer will continue to flush the message w/o serializing or encrypting the message again, until the message is fully written to the wire or the write idle timer disconnects the peer. --- peer.go | 81 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 19 deletions(-) diff --git a/peer.go b/peer.go index 9a5edc057a2..4832ddcb652 100644 --- a/peer.go +++ b/peer.go @@ -1403,43 +1403,77 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) { })) } -// writeMessage writes the target lnwire.Message to the remote peer. +// writeMessage writes and flushes the target lnwire.Message to the remote peer. +// If the passed message is nil, this method will only try to flush an existing +// message buffered on the connection. It is safe to recall this method with a +// nil message iff a timeout error is returned. This will continue to flush the +// pending message to the wire. func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { return ErrPeerExiting } - p.logWireMessage(msg, false) + // Only log the message on the first attempt. + if msg != nil { + p.logWireMessage(msg, false) + } - var n int - err := p.writePool.Submit(func(buf *bytes.Buffer) error { - // Using a buffer allocated by the write pool, encode the - // message directly into the buffer. - _, writeErr := lnwire.WriteMessage(buf, msg, 0) - if writeErr != nil { - return writeErr - } + noiseConn, ok := p.conn.(*brontide.Conn) + if !ok { + return fmt.Errorf("brontide.Conn required to write messages") + } + flushMsg := func() error { // Ensure the write deadline is set before we attempt to send // the message. writeDeadline := time.Now().Add(writeMessageTimeout) - writeErr = p.conn.SetWriteDeadline(writeDeadline) + err := noiseConn.SetWriteDeadline(writeDeadline) + if err != nil { + return err + } + + // Flush the pending message to the wire. If an error is + // encountered, e.g. write timeout, the number of bytes written + // so far will be returned. + n, err := noiseConn.Flush() + + // Record the number of bytes written on the wire, if any. + if n > 0 { + atomic.AddUint64(&p.bytesSent, uint64(n)) + } + + return err + } + + // If the current message has already been serialized, encrypted, and + // buffered on the underlying connection we will skip straight to + // flushing it to the wire. + if msg == nil { + return flushMsg() + } + + // Otherwise, this is a new message. We'll acquire a write buffer to + // serialize the message and buffer the ciphertext on the connection. + err := p.writePool.Submit(func(buf *bytes.Buffer) error { + // Using a buffer allocated by the write pool, encode the + // message directly into the buffer. + _, writeErr := lnwire.WriteMessage(buf, msg, 0) if writeErr != nil { return writeErr } - // Finally, write the message itself in a single swoop. - n, writeErr = p.conn.Write(buf.Bytes()) - return writeErr + // Finally, write the message itself in a single swoop. This + // will buffer the ciphertext on the underlying connection. We + // will defer flushing the message until the write pool has been + // released. + return noiseConn.WriteMessage(buf.Bytes()) }) - - // Record the number of bytes written on the wire, if any. - if n > 0 { - atomic.AddUint64(&p.bytesSent, uint64(n)) + if err != nil { + return err } - return err + return flushMsg() } // writeHandler is a goroutine dedicated to reading messages off of an incoming @@ -1528,6 +1562,15 @@ out: "first attempted %v ago", p, retryDelay, time.Since(startTime)) + // If we received a timeout error, this implies + // that the message was buffered on the + // connection successfully and that a flush was + // attempted. We'll set the message to nil so + // that on a subsequent pass we only try to + // flush the buffered message, and forgo + // reserializing or reencrypting it. + outMsg.msg = nil + goto retryWithDelay } From e013577a4f07666440af54fe6982b89e0629a3a2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:05:41 -0700 Subject: [PATCH 09/10] peer: remove write backoff This commit removes the write backoff, since subsequent retries no longer need to access the write pool. Subsequent flushes will resume writing any partial writes that occurred before the timeout until the message is received or the idle write timer is triggered. In the future, we can add callbacks that execute on write events timeouts. This can be useful for mobile clients that might roam, as the timeout could indicate the connection is dead even if the OS has not reported it closed. The callback can be used then, for example, to initiate another outbound connection to test whether or not the issue is related to the connection. --- peer.go | 46 ++++------------------------------------------ 1 file changed, 4 insertions(+), 42 deletions(-) diff --git a/peer.go b/peer.go index 4832ddcb652..e92574c75c4 100644 --- a/peer.go +++ b/peer.go @@ -1493,11 +1493,6 @@ func (p *peer) writeHandler() { var exitErr error - const ( - minRetryDelay = 5 * time.Second - maxRetryDelay = time.Minute - ) - out: for { select { @@ -1517,49 +1512,16 @@ out: // message. startTime := time.Now() - // Initialize a retry delay of zero, which will be - // increased if we encounter a write timeout on the - // send. - var retryDelay time.Duration - retryWithDelay: - if retryDelay > 0 { - select { - case <-time.After(retryDelay): - case <-p.quit: - // Inform synchronous writes that the - // peer is exiting. - if outMsg.errChan != nil { - outMsg.errChan <- ErrPeerExiting - } - exitErr = ErrPeerExiting - break out - } - } - + retry: // Write out the message to the socket. If a timeout // error is encountered, we will catch this and retry // after backing off in case the remote peer is just // slow to process messages from the wire. err := p.writeMessage(outMsg.msg) if nerr, ok := err.(net.Error); ok && nerr.Timeout() { - // Increase the retry delay in the event of a - // timeout error, this prevents us from - // disconnecting if the remote party is slow to - // pull messages off the wire. We back off - // exponentially up to our max delay to prevent - // blocking the write pool. - if retryDelay == 0 { - retryDelay = minRetryDelay - } else { - retryDelay *= 2 - if retryDelay > maxRetryDelay { - retryDelay = maxRetryDelay - } - } - peerLog.Debugf("Write timeout detected for "+ - "peer %s, retrying after %v, "+ - "first attempted %v ago", p, retryDelay, + "peer %s, first write for message "+ + "attempted %v ago", p, time.Since(startTime)) // If we received a timeout error, this implies @@ -1571,7 +1533,7 @@ out: // reserializing or reencrypting it. outMsg.msg = nil - goto retryWithDelay + goto retry } // The write succeeded, reset the idle timer to prevent From e3a6de15540ab1a4f49372b300d760f9c42d26e9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:30:27 -0700 Subject: [PATCH 10/10] lncfg/workers: reduce default write pool workers to 8 Now that the write pool no longer executes blocking i/o operations, it is safe to reduce this number considerably. The write pool now only handles encoding and encryption of messages, making problem computationally bound and thus dependent on available CPUs. The descriptions of the workers configs is also updated to explain how users should set these on their on machines. --- lncfg/workers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lncfg/workers.go b/lncfg/workers.go index bbd9960b881..136b38d7b7d 100644 --- a/lncfg/workers.go +++ b/lncfg/workers.go @@ -9,7 +9,7 @@ const ( // DefaultWriteWorkers is the default maximum number of concurrent // workers used by the daemon's write pool. - DefaultWriteWorkers = 100 + DefaultWriteWorkers = 8 // DefaultSigWorkers is the default maximum number of concurrent workers // used by the daemon's sig pool. @@ -20,13 +20,13 @@ const ( // pools. type Workers struct { // Read is the maximum number of concurrent read pool workers. - Read int `long:"read" description:"Maximum number of concurrent read pool workers."` + Read int `long:"read" description:"Maximum number of concurrent read pool workers. This number should be proportional to the number of peers."` // Write is the maximum number of concurrent write pool workers. - Write int `long:"write" description:"Maximum number of concurrent write pool workers."` + Write int `long:"write" description:"Maximum number of concurrent write pool workers. This number should be proportional to the number of CPUs on the host. "` // Sig is the maximum number of concurrent sig pool workers. - Sig int `long:"sig" description:"Maximum number of concurrent sig pool workers."` + Sig int `long:"sig" description:"Maximum number of concurrent sig pool workers. This number should be proportional to the number of CPUs on the host."` } // Validate checks the Workers configuration to ensure that the input values are