Skip to content

Commit

Permalink
Merge 45909cc into 4bde4c1
Browse files Browse the repository at this point in the history
  • Loading branch information
cfromknecht committed Jun 8, 2018
2 parents 4bde4c1 + 45909cc commit 2503988
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 180 deletions.
178 changes: 109 additions & 69 deletions discovery/gossiper.go

Large diffs are not rendered by default.

167 changes: 132 additions & 35 deletions discovery/gossiper_test.go

Large diffs are not rendered by default.

20 changes: 2 additions & 18 deletions htlcswitch/interfaces.go
Expand Up @@ -2,9 +2,9 @@ package htlcswitch

import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
)

// InvoiceDatabase is an interface which represents the persistent subsystem
Expand Down Expand Up @@ -97,7 +97,7 @@ type ChannelLink interface {

// Peer returns the representation of remote peer with which we have
// the channel link opened.
Peer() Peer
Peer() lnpeer.Peer

// EligibleToForward returns a bool indicating if the channel is able
// to actively accept requests to forward HTLC's. A channel may be
Expand All @@ -116,22 +116,6 @@ type ChannelLink interface {
Stop()
}

// Peer is an interface which represents the remote lightning node inside our
// system.
type Peer interface {
// SendMessage sends message to remote peer. The second argument
// denotes if the method should block until the message has been sent
// to the remote peer.
SendMessage(msg lnwire.Message, sync bool) error

// WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer.
WipeChannel(*wire.OutPoint) error

// PubKey returns the serialize public key of the source peer.
PubKey() [33]byte
}

// ForwardingLog is an interface that represents a time series database which
// keep track of all successfully completed payment circuits. Every few
// seconds, the switch will collate and flush out all the successful payment
Expand Down
35 changes: 18 additions & 17 deletions htlcswitch/link.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -163,7 +164,7 @@ type ChannelLinkConfig struct {

// Peer is a lightning network node with which we have the channel link
// opened.
Peer Peer
Peer lnpeer.Peer

// Registry is a sub-system which responsible for managing the invoices
// in thread-safe manner.
Expand Down Expand Up @@ -534,7 +535,7 @@ func (l *channelLink) syncChanStates() error {
return fmt.Errorf("unable to generate chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint())
}
if err := l.cfg.Peer.SendMessage(localChanSyncMsg, false); err != nil {
if err := l.cfg.Peer.SendMessage(false, localChanSyncMsg); err != nil {
return fmt.Errorf("Unable to send chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint())
}
Expand Down Expand Up @@ -576,7 +577,7 @@ func (l *channelLink) syncChanStates() error {
fundingLockedMsg := lnwire.NewFundingLocked(
l.ChanID(), nextRevocation,
)
err = l.cfg.Peer.SendMessage(fundingLockedMsg, false)
err = l.cfg.Peer.SendMessage(false, fundingLockedMsg)
if err != nil {
return fmt.Errorf("unable to re-send "+
"FundingLocked: %v", err)
Expand Down Expand Up @@ -626,7 +627,7 @@ func (l *channelLink) syncChanStates() error {
// immediately so we return to a synchronized state as soon as
// possible.
for _, msg := range msgsToReSend {
l.cfg.Peer.SendMessage(msg, false)
l.cfg.Peer.SendMessage(false, msg)
}

case <-l.quit:
Expand Down Expand Up @@ -1107,7 +1108,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
l.openedCircuits = append(l.openedCircuits, pkt.inKey())
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())

l.cfg.Peer.SendMessage(htlc, false)
l.cfg.Peer.SendMessage(false, htlc)

case *lnwire.UpdateFulfillHTLC:
// If hodl.SettleOutgoing mode is active, we exit early to
Expand Down Expand Up @@ -1148,7 +1149,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {

// Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message.
l.cfg.Peer.SendMessage(htlc, false)
l.cfg.Peer.SendMessage(false, htlc)
isSettle = true

case *lnwire.UpdateFailHTLC:
Expand Down Expand Up @@ -1189,7 +1190,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {

// Finally, we send the HTLC message to the peer which
// initially created the HTLC.
l.cfg.Peer.SendMessage(htlc, false)
l.cfg.Peer.SendMessage(false, htlc)
isSettle = true
}

Expand Down Expand Up @@ -1342,7 +1343,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
log.Errorf("unable to revoke commitment: %v", err)
return
}
l.cfg.Peer.SendMessage(nextRevocation, false)
l.cfg.Peer.SendMessage(false, nextRevocation)

// Since we just revoked our commitment, we may have a new set
// of HTLC's on our commitment, so we'll send them over our
Expand Down Expand Up @@ -1561,7 +1562,7 @@ func (l *channelLink) updateCommitTx() error {
CommitSig: theirCommitSig,
HtlcSigs: htlcSigs,
}
l.cfg.Peer.SendMessage(commitSig, false)
l.cfg.Peer.SendMessage(false, commitSig)

// We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the
Expand All @@ -1585,7 +1586,7 @@ func (l *channelLink) updateCommitTx() error {
// channel link opened.
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) Peer() Peer {
func (l *channelLink) Peer() lnpeer.Peer {
return l.cfg.Peer
}

Expand Down Expand Up @@ -1852,7 +1853,7 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
// We'll then attempt to send a new UpdateFee message, and also lock it
// in immediately by triggering a commitment update.
msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
if err := l.cfg.Peer.SendMessage(msg, false); err != nil {
if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
return err
}
return l.updateCommitTx()
Expand Down Expand Up @@ -2260,11 +2261,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,

// HTLC was successfully settled locally send
// notification about it remote peer.
l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{
l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
ChanID: l.ChanID(),
ID: pd.HtlcIndex,
PaymentPreimage: preimage,
}, false)
})
needUpdate = true

// There are additional channels left within this route. So
Expand Down Expand Up @@ -2550,11 +2551,11 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess
return
}

l.cfg.Peer.SendMessage(&lnwire.UpdateFailHTLC{
l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailHTLC{
ChanID: l.ChanID(),
ID: htlcIndex,
Reason: reason,
}, false)
})
}

// sendMalformedHTLCError helper function which sends the malformed HTLC update
Expand All @@ -2569,12 +2570,12 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
return
}

l.cfg.Peer.SendMessage(&lnwire.UpdateFailMalformedHTLC{
l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
ChanID: l.ChanID(),
ID: htlcIndex,
ShaOnionBlob: shaOnionBlob,
FailureCode: code,
}, false)
})
}

// fail is a function which is used to encapsulate the action necessary for
Expand Down
12 changes: 8 additions & 4 deletions htlcswitch/link_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
Expand Down Expand Up @@ -1396,14 +1397,14 @@ type mockPeer struct {
quit chan struct{}
}

var _ Peer = (*mockPeer)(nil)
var _ lnpeer.Peer = (*mockPeer)(nil)

func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error {
func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {
if m.disconnected {
return fmt.Errorf("disconnected")
}
select {
case m.sentMsgs <- msg:
case m.sentMsgs <- msgs[0]:
case <-m.quit:
return fmt.Errorf("mockPeer shutting down")
}
Expand All @@ -1415,8 +1416,11 @@ func (m *mockPeer) WipeChannel(*wire.OutPoint) error {
func (m *mockPeer) PubKey() [33]byte {
return [33]byte{}
}
func (m *mockPeer) IdentityKey() *btcec.PublicKey {
return nil
}

var _ Peer = (*mockPeer)(nil)
var _ lnpeer.Peer = (*mockPeer)(nil)

func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error,
Expand Down
26 changes: 17 additions & 9 deletions htlcswitch/mock.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
Expand Down Expand Up @@ -119,7 +120,7 @@ type mockServer struct {
interceptorFuncs []messageInterceptor
}

var _ Peer = (*mockServer)(nil)
var _ lnpeer.Peer = (*mockServer)(nil)

func initSwitchWithDB(db *channeldb.DB) (*Switch, error) {
if db == nil {
Expand Down Expand Up @@ -450,12 +451,14 @@ func (s *mockServer) intersect(f messageInterceptor) {
s.interceptorFuncs = append(s.interceptorFuncs, f)
}

func (s *mockServer) SendMessage(message lnwire.Message, sync bool) error {
func (s *mockServer) SendMessage(sync bool, msgs ...lnwire.Message) error {

select {
case s.messages <- message:
case <-s.quit:
return errors.New("server is stopped")
for _, msg := range msgs {
select {
case s.messages <- msg:
case <-s.quit:
return errors.New("server is stopped")
}
}

return nil
Expand Down Expand Up @@ -506,6 +509,11 @@ func (s *mockServer) PubKey() [33]byte {
return s.id
}

func (s *mockServer) IdentityKey() *btcec.PublicKey {
pubkey, _ := btcec.ParsePubKey(s.id[:], btcec.S256())
return pubkey
}

func (s *mockServer) WipeChannel(*wire.OutPoint) error {
return nil
}
Expand All @@ -532,7 +540,7 @@ type mockChannelLink struct {

chanID lnwire.ChannelID

peer Peer
peer lnpeer.Peer

startMailBox bool

Expand Down Expand Up @@ -579,7 +587,7 @@ func (f *mockChannelLink) deleteCircuit(pkt *htlcPacket) error {
}

func newMockChannelLink(htlcSwitch *Switch, chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID, peer Peer, eligible bool,
shortChanID lnwire.ShortChannelID, peer lnpeer.Peer, eligible bool,
) *mockChannelLink {

return &mockChannelLink{
Expand Down Expand Up @@ -624,7 +632,7 @@ func (f *mockChannelLink) Start() error {
func (f *mockChannelLink) ChanID() lnwire.ChannelID { return f.chanID }
func (f *mockChannelLink) ShortChanID() lnwire.ShortChannelID { return f.shortChanID }
func (f *mockChannelLink) Bandwidth() lnwire.MilliSatoshi { return 99999999 }
func (f *mockChannelLink) Peer() Peer { return f.peer }
func (f *mockChannelLink) Peer() lnpeer.Peer { return f.peer }
func (f *mockChannelLink) Stop() {}
func (f *mockChannelLink) EligibleToForward() bool { return f.eligible }
func (f *mockChannelLink) setLiveShortChanID(sid lnwire.ShortChannelID) { f.shortChanID = sid }
Expand Down
3 changes: 2 additions & 1 deletion htlcswitch/test_utils.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
Expand Down Expand Up @@ -672,7 +673,7 @@ func (r *paymentResponse) Wait(d time.Duration) (chainhash.Hash, error) {
// * from Alice to Bob
// * from Alice to Carol through the Bob
// * from Alice to some another peer through the Bob
func (n *threeHopNetwork) makePayment(sendingPeer, receivingPeer Peer,
func (n *threeHopNetwork) makePayment(sendingPeer, receivingPeer lnpeer.Peer,
firstHopPub [33]byte, hops []ForwardingInfo,
invoiceAmt, htlcAmt lnwire.MilliSatoshi,
timelock uint32) *paymentResponse {
Expand Down
26 changes: 26 additions & 0 deletions lnpeer/peer.go
@@ -0,0 +1,26 @@
package lnpeer

import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
)

// Peer is an interface which represents the remote lightning node inside our
// system.
type Peer interface {
// SendMessage sends a variadic number of message to remote peer. The
// first argument denotes if the method should block until the message
// has been sent to the remote peer.
SendMessage(sync bool, msg ...lnwire.Message) error

// WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer.
WipeChannel(*wire.OutPoint) error

// PubKey returns the serialized public key of the remote peer.
PubKey() [33]byte

// IdentityKey returns the public key of the remote peer.
IdentityKey() *btcec.PublicKey
}

0 comments on commit 2503988

Please sign in to comment.