Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test+server: finalize inbound/outbound fixes, update itests to account for fixes #1349

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 55 additions & 16 deletions lnd_test.go
Expand Up @@ -5105,11 +5105,20 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
// broadcasting his current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so he'll soon
// feel the wrath of Alice's retribution.
force := true
closeUpdates, closeTxId, err := net.CloseChannel(ctxb, carol,
chanPoint, force)
var (
closeUpdates lnrpc.Lightning_CloseChannelClient
closeTxId *chainhash.Hash
closeErr error
force bool = true
)
err = lntest.WaitPredicate(func() bool {
closeUpdates, closeTxId, closeErr = net.CloseChannel(
ctxb, carol, chanPoint, force,
)
return closeErr == nil
}, time.Second*15)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
t.Fatalf("unable to close channel: %v", closeErr)
}

// Query the mempool for the breaching closing transaction, this should
Expand Down Expand Up @@ -6787,6 +6796,27 @@ func assertActiveHtlcs(nodes []*lntest.HarnessNode, payHashes ...[]byte) error {
return nil
}

func assertNumActiveHtlcsChanPoint(node *lntest.HarnessNode,
chanPoint wire.OutPoint, numHtlcs int) bool {

req := &lnrpc.ListChannelsRequest{}
ctxb := context.Background()
nodeChans, err := node.ListChannels(ctxb, req)
if err != nil {
return false
}

for _, channel := range nodeChans.Channels {
if channel.ChannelPoint != chanPoint.String() {
continue
}

return len(channel.PendingHtlcs) == numHtlcs
}

return false
}

func assertNumActiveHtlcs(nodes []*lntest.HarnessNode, numHtlcs int) bool {
req := &lnrpc.ListChannelsRequest{}
ctxb := context.Background()
Expand Down Expand Up @@ -9079,11 +9109,11 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("htlc mismatch: %v", err)
}

// Disconnect the two intermediaries, Alice and Dave, so that when carol
// restarts, the response will be held by Dave.
// Disconnect the two intermediaries, Alice and Dave, by shutting down
// Alice.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil {
t.Fatalf("unable to disconnect alice from dave: %v", err)
if err := net.StopNode(net.Alice); err != nil {
t.Fatalf("unable to shutdown alice: %v", err)
}

// Now restart carol without hodl mode, to settle back the outstanding
Expand All @@ -9101,10 +9131,12 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("unable to reconnect dave and carol: %v", err)
}

// Wait for Carol to report no outstanding htlcs.
// Wait for Carol to report no outstanding htlcs, and also for Dav to
// receive all the settles from Carol.
carolNode := []*lntest.HarnessNode{carol}
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(carolNode, 0)
return assertNumActiveHtlcs(carolNode, 0) &&
assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0)
}, time.Second*15)
if err != nil {
t.Fatalf("htlc mismatch: %v", err)
Expand All @@ -9113,7 +9145,10 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
// Finally, restart dave who received the settles, but was unable to
// deliver them to Alice since they were disconnected.
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to reconnect alice to dave: %v", err)
t.Fatalf("unable to restart dave: %v", err)
}
if err = net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart alice: %v", err)
}

// Force Dave and Alice to reconnect before waiting for the htlcs to
Expand All @@ -9124,8 +9159,8 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("unable to reconnect dave and carol: %v", err)
}

// After reconnection succeeds, the settles should be propagated all the
// way back to the sender. All nodes should report no active htlcs.
// After reconnection succeeds, the settles should be propagated all
// the way back to the sender. All nodes should report no active htlcs.
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(nodes, 0)
}, time.Second*15)
Expand Down Expand Up @@ -9410,8 +9445,8 @@ func testSwitchOfflineDeliveryOutgoingOffline(
// Disconnect the two intermediaries, Alice and Dave, so that when carol
// restarts, the response will be held by Dave.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil {
t.Fatalf("unable to disconnect alice from dave: %v", err)
if err := net.StopNode(net.Alice); err != nil {
t.Fatalf("unable to shutdown alice: %v", err)
}

// Now restart carol without hodl mode, to settle back the outstanding
Expand All @@ -9424,7 +9459,8 @@ func testSwitchOfflineDeliveryOutgoingOffline(
// Wait for Carol to report no outstanding htlcs.
carolNode := []*lntest.HarnessNode{carol}
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(carolNode, 0)
return assertNumActiveHtlcs(carolNode, 0) &&
assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0)
}, time.Second*15)
if err != nil {
t.Fatalf("htlc mismatch: %v", err)
Expand All @@ -9451,6 +9487,9 @@ func testSwitchOfflineDeliveryOutgoingOffline(
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to restart dave: %v", err)
}
if err = net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart alice: %v", err)
}

// Ensure that Dave is reconnected to Alice before waiting for the htlcs
// to clear.
Expand Down
7 changes: 7 additions & 0 deletions lntest/harness.go
Expand Up @@ -566,6 +566,13 @@ func (n *NetworkHarness) ShutdownNode(node *HarnessNode) error {
return nil
}

// StopNode stops the target node, but doesn't yet clean up its directories.
// This can be used to temporarily bring a node down during a test, to be later
// started up again.
func (n *NetworkHarness) StopNode(node *HarnessNode) error {
return node.stop()
}

// TODO(roasbeef): add a WithChannel higher-order function?
// * python-like context manager w.r.t using a channel within a test
// * possibly adds more funds to the target wallet if the funds are not
Expand Down
3 changes: 1 addition & 2 deletions lnwallet/channel.go
Expand Up @@ -1332,7 +1332,7 @@ type LightningChannel struct {
cowg sync.WaitGroup
wg sync.WaitGroup

quit chan struct{}
quit chan struct{}
}

// NewLightningChannel creates a new, active payment channel given an
Expand Down Expand Up @@ -3184,7 +3184,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
// revocation, but also initiate a state transition to re-sync
// them.
if !lc.FullySynced() {

commitSig, htlcSigs, err := lc.SignNextCommitment()
switch {

Expand Down
70 changes: 42 additions & 28 deletions server.go
Expand Up @@ -1384,9 +1384,11 @@ func (s *server) peerTerminationWatcher(p *peer) {
// available for use.
s.fundingMgr.CancelPeerReservations(p.PubKey())

pubKey := p.addr.IdentityKey

// We'll also inform the gossiper that this peer is no longer active,
// so we don't need to maintain sync state for it any longer.
s.authGossiper.PruneSyncState(p.addr.IdentityKey)
s.authGossiper.PruneSyncState(pubKey)

// Tell the switch to remove all links associated with this peer.
// Passing nil as the target link indicates that all links associated
Expand Down Expand Up @@ -1435,7 +1437,7 @@ func (s *server) peerTerminationWatcher(p *peer) {
s.removePeer(p)

// Next, check to see if this is a persistent peer or not.
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
pubStr := string(pubKey.SerializeCompressed())
_, ok := s.persistentPeers[pubStr]
if ok {
// We'll only need to re-launch a connection request if one
Expand All @@ -1444,6 +1446,23 @@ func (s *server) peerTerminationWatcher(p *peer) {
return
}

// We'll ensure that we locate an advertised address to use
// within the peer's address for reconnection purposes.
//
// TODO(roasbeef): use them all?
if p.inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(
pubKey,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve advertised "+
"address for node %x: %v",
pubKey.SerializeCompressed(), err)
} else {
p.addr.Address = advertisedAddr
}
}

// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
connReq := &connmgr.ConnReq{
Expand Down Expand Up @@ -1526,20 +1545,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
addr := conn.RemoteAddr()
pubKey := brontideConn.RemotePub()

// We'll ensure that we locate an advertised address to use within the
// peer's address for reconnection purposes.
//
// TODO: leave the address field empty if there aren't any?
if inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey)
if err != nil {
srvrLog.Errorf("Unable to retrieve advertised address "+
"for node %x: %v", pubKey.SerializeCompressed(),
err)
} else {
addr = advertisedAddr
}
}
srvrLog.Infof("finalizing connection to %x, inbound=%v",
pubKey.SerializeCompressed(), inbound)

peerAddr := &lnwire.NetAddress{
IdentityKey: pubKey,
Expand Down Expand Up @@ -1618,10 +1625,12 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
s.mu.Lock()
defer s.mu.Unlock()

// If we already have an inbound connection to this peer, then ignore
// If we already have an outbound connection to this peer, then ignore
// this new connection.
if _, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Ignoring duplicate inbound connection")
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have outbound connection for %v, "+
"ignoring inbound connection", nodePub.SerializeCompressed())

conn.Close()
return
}
Expand All @@ -1637,12 +1646,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) {

srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())

// Cancel all pending connection requests, we either already have an
// outbound connection, or this incoming connection will become our
// primary connection. The incoming connection will not have an
// associated connection request, so we pass nil.
s.cancelConnReqs(pubStr, nil)

// Check to see if we already have a connection with this peer. If so,
// we may need to drop our existing connection. This prevents us from
// having duplicate connections to the same peer. We forgo adding a
Expand All @@ -1653,6 +1656,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
case ErrPeerNotConnected:
// We were unable to locate an existing connection with the
// target peer, proceed to connect.
s.cancelConnReqs(pubStr, nil)
s.peerConnected(conn, nil, true)

case nil:
Expand All @@ -1674,6 +1678,8 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
srvrLog.Debugf("Disconnecting stale connection to %v",
connectedPeer)

s.cancelConnReqs(pubStr, nil)

// Remove the current peer from the server's internal state and
// signal that the peer termination watcher does not need to
// execute for this peer.
Expand Down Expand Up @@ -1701,10 +1707,13 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.mu.Lock()
defer s.mu.Unlock()

// If we already have an outbound connection to this peer, then ignore
// If we already have an inbound connection to this peer, then ignore
// this new connection.
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Ignoring duplicate outbound connection")
if _, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have inbound connection for %v, "+
"ignoring outbound connection",
nodePub.SerializeCompressed())

if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
Expand All @@ -1723,6 +1732,11 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// ignore this connection.
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
srvrLog.Debugf("Ignoring connection, peer already scheduled")

if connReq != nil {
s.connMgr.Remove(connReq.ID())
}

conn.Close()
return
}
Expand Down