Skip to content

Commit

Permalink
Merge pull request #4051 from carlaKC/3000-peererrors
Browse files Browse the repository at this point in the history
peers: Track errors across connections
  • Loading branch information
carlaKC committed Mar 17, 2020
2 parents 65f5119 + 4c48d03 commit b03c3c2
Show file tree
Hide file tree
Showing 10 changed files with 1,412 additions and 820 deletions.
14 changes: 12 additions & 2 deletions cmd/lncli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,15 +1335,25 @@ var listPeersCommand = cli.Command{
Name: "listpeers",
Category: "Peers",
Usage: "List all active, currently connected peers.",
Action: actionDecorator(listPeers),
Flags: []cli.Flag{
cli.BoolFlag{
Name: "list_errors",
Usage: "list a full set of most recent errors for the peer",
},
},
Action: actionDecorator(listPeers),
}

func listPeers(ctx *cli.Context) error {
ctxb := context.Background()
client, cleanUp := getClient(ctx)
defer cleanUp()

req := &lnrpc.ListPeersRequest{}
// By default, we display a single error on the cli. If the user
// specifically requests a full error set, then we will provide it.
req := &lnrpc.ListPeersRequest{
LatestError: !ctx.IsSet("list_errors"),
}
resp, err := client.ListPeers(ctxb, req)
if err != nil {
return err
Expand Down
1,700 changes: 890 additions & 810 deletions lnrpc/rpc.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions lnrpc/rpc.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions lnrpc/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1571,9 +1571,32 @@ message Peer {

/// Features advertised by the remote peer in their init message.
map<uint32, Feature> features = 11;

/*
The latest errors received from our peer with timestamps, limited to the 10
most recent errors. These errors are tracked across peer connections, but
are not persisted across lnd restarts. Note that these errors are only
stored for peers that we have channels open with, to prevent peers from
spamming us with errors at no cost.
*/
repeated TimestampedError errors = 12;
}

message TimestampedError {
// The unix timestamp in seconds when the error occurred.
uint64 timestamp = 1;

// The string representation of the error sent by our peer.
string error = 2;
}

message ListPeersRequest {
/*
If true, only the last error that our peer sent us will be returned with
the peer's information, rather than the full set of historic errors we have
stored.
*/
bool latest_error = 1;
}
message ListPeersResponse {
/// The list of currently connected peers
Expand Down
31 changes: 31 additions & 0 deletions lnrpc/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,16 @@
}
}
},
"parameters": [
{
"name": "latest_error",
"description": "If true, only the last error that our peer sent us will be returned with\nthe peer's information, rather than the full set of historic errors we have\nstored.",
"in": "query",
"required": false,
"type": "boolean",
"format": "boolean"
}
],
"tags": [
"Lightning"
]
Expand Down Expand Up @@ -3819,6 +3829,13 @@
"$ref": "#/definitions/lnrpcFeature"
},
"description": "/ Features advertised by the remote peer in their init message."
},
"errors": {
"type": "array",
"items": {
"$ref": "#/definitions/lnrpcTimestampedError"
},
"description": "The latest errors received from our peer with timestamps, limited to the 10\nmost recent errors. These errors are tracked across peer connections, but\nare not persisted across lnd restarts. Note that these errors are only\nstored for peers that we have channels open with, to prevent peers from\nspamming us with errors at no cost."
}
}
},
Expand Down Expand Up @@ -4270,6 +4287,20 @@
"lnrpcStopResponse": {
"type": "object"
},
"lnrpcTimestampedError": {
"type": "object",
"properties": {
"timestamp": {
"type": "string",
"format": "uint64",
"description": "The unix timestamp in seconds when the error occurred."
},
"error": {
"type": "string",
"description": "The string representation of the error sent by our peer."
}
}
},
"lnrpcTransaction": {
"type": "object",
"properties": {
Expand Down
81 changes: 74 additions & 7 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/ticker"
)

Expand All @@ -52,6 +53,9 @@ const (
// messages to be sent across the wire, requested by objects outside
// this struct.
outgoingQueueLen = 50

// errorBufferSize is the number of historic peer errors that we store.
errorBufferSize = 10
)

// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
Expand Down Expand Up @@ -91,6 +95,13 @@ type channelCloseUpdate struct {
Success bool
}

// timestampedError is a timestamped error that is used to store the most recent
// errors we have experienced with our peers.
type timestampedError struct {
error error
timestamp time.Time
}

// peer is an active peer on the Lightning Network. This struct is responsible
// for managing any channel state related to this peer. To do so, it has
// several helper goroutines to handle events such as HTLC timeouts, new
Expand Down Expand Up @@ -216,6 +227,14 @@ type peer struct {
// peer's chansync message with its own over and over again.
resentChanSyncMsg map[lnwire.ChannelID]struct{}

// errorBuffer stores a set of errors related to a peer. It contains
// error messages that our peer has recently sent us over the wire and
// records of unknown messages that were sent to us and, so that we can
// track a full record of the communication errors we have had with our
// peer. If we choose to disconnect from a peer, it also stores the
// reason we had for disconnecting.
errorBuffer *queue.CircularBuffer

// writePool is the task pool to that manages reuse of write buffers.
// Write tasks are submitted to the pool in order to conserve the total
// number of write buffers allocated at any one time, and decouple write
Expand All @@ -233,12 +252,15 @@ type peer struct {
var _ lnpeer.Peer = (*peer)(nil)

// newPeer creates a new peer from an establish connection object, and a
// pointer to the main server.
// pointer to the main server. It takes an error buffer which may contain errors
// from a previous connection with the peer if we have been connected to them
// before.
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
addr *lnwire.NetAddress, inbound bool,
features, legacyFeatures *lnwire.FeatureVector,
chanActiveTimeout time.Duration,
outgoingCltvRejectDelta uint32) (
outgoingCltvRejectDelta uint32,
errBuffer *queue.CircularBuffer) (
*peer, error) {

nodePub := addr.IdentityKey
Expand Down Expand Up @@ -276,6 +298,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,

chanActiveTimeout: chanActiveTimeout,

errorBuffer: errBuffer,

writePool: server.writePool,
readPool: server.readPool,

Expand Down Expand Up @@ -338,6 +362,7 @@ func (p *peer) Start() error {
msg := <-msgChan
if msg, ok := msg.(*lnwire.Init); ok {
if err := p.handleInitMsg(msg); err != nil {
p.storeError(err)
return err
}
} else {
Expand Down Expand Up @@ -668,7 +693,10 @@ func (p *peer) Disconnect(reason error) {
return
}

peerLog.Infof("Disconnecting %s, reason: %v", p, reason)
err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
p.storeError(err)

peerLog.Infof(err.Error())

// Ensure that the TCP connection is properly closed before continuing.
p.conn.Close()
Expand Down Expand Up @@ -1026,12 +1054,17 @@ out:
peerLog.Infof("unable to read message from %v: %v",
p, err)

switch err.(type) {
// If we could not read our peer's message due to an
// unknown type or invalid alias, we continue processing
// as normal. We store unknown message and address
// types, as they may provide debugging insight.
switch e := err.(type) {
// If this is just a message we don't yet recognize,
// we'll continue processing as normal as this allows
// us to introduce new messages in a forwards
// compatible manner.
case *lnwire.UnknownMessage:
p.storeError(e)
idleTimer.Reset(idleTimeout)
continue

Expand All @@ -1040,12 +1073,15 @@ out:
// simply continue parsing the remainder of their
// messages.
case *lnwire.ErrUnknownAddrType:
p.storeError(e)
idleTimer.Reset(idleTimeout)
continue

// If the NodeAnnouncement has an invalid alias, then
// we'll log that error above and continue so we can
// continue to read messges from the peer.
// continue to read messages from the peer. We do not
// store this error because it is of little debugging
// value.
case *lnwire.ErrInvalidNodeAlias:
idleTimer.Reset(idleTimeout)
continue
Expand Down Expand Up @@ -1141,8 +1177,13 @@ out:
discStream.AddMsg(msg)

default:
peerLog.Errorf("unknown message %v received from peer "+
"%v", uint16(msg.MsgType()), p)
// If the message we received is unknown to us, store
// the type to track the failure.
err := fmt.Errorf("unknown message type %v received",
uint16(msg.MsgType()))
p.storeError(err)

peerLog.Errorf("peer: %v, %v", p, err)
}

if isLinkUpdate {
Expand Down Expand Up @@ -1181,13 +1222,39 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
return ok
}

// storeError stores an error in our peer's buffer of recent errors with the
// current timestamp. Errors are only stored if we have at least one active
// channel with the peer to mitigate dos attack vectors where a peer costlessly
// connects to us and spams us with errors.
func (p *peer) storeError(err error) {
p.activeChanMtx.RLock()
channelCount := len(p.activeChannels)
p.activeChanMtx.RUnlock()

// If we do not have any active channels with the peer, we do not store
// errors as a dos mitigation.
if channelCount == 0 {
peerLog.Tracef("no channels with peer: %v, not storing err", p)
return
}

p.errorBuffer.Add(
&timestampedError{timestamp: time.Now(), error: err},
)
}

// handleError processes an error message read from the remote peer. The boolean
// returns indicates whether the message should be delivered to a targeted peer.
// It stores the error we received from the peer in memory if we have a channel
// open with the peer.
//
// NOTE: This method should only be called from within the readHandler.
func (p *peer) handleError(msg *lnwire.Error) bool {
key := p.addr.IdentityKey

// Store the error we have received.
p.storeError(msg)

switch {

// In the case of an all-zero channel ID we want to forward the error to
Expand Down
Loading

0 comments on commit b03c3c2

Please sign in to comment.