Skip to content

Commit

Permalink
Merge pull request #1349 from Roasbeef/server-fixes-itest-stability
Browse files Browse the repository at this point in the history
test+server: finalize inbound/outbound fixes, update itests to account for fixes
  • Loading branch information
Roasbeef committed Jun 12, 2018
2 parents 2e838ab + e60d2b7 commit c3191a7
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 125 deletions.
2 changes: 1 addition & 1 deletion discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates
}

log.Infof("Creating new gossipSyncer for peer=%x",
nodeID)
nodeID[:])

syncer := newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
Expand Down
31 changes: 20 additions & 11 deletions htlcswitch/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ type Switch struct {
forwardingIndex map[lnwire.ShortChannelID]ChannelLink

// interfaceIndex maps the compressed public key of a peer to all the
// channels that the switch maintains iwht that peer.
interfaceIndex map[[33]byte]map[ChannelLink]struct{}
// channels that the switch maintains with that peer.
interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink

// htlcPlex is the channel which all connected links use to coordinate
// the setup/teardown of Sphinx (onion routing) payment circuits.
Expand Down Expand Up @@ -253,7 +253,7 @@ func New(cfg Config) (*Switch, error) {
linkIndex: make(map[lnwire.ChannelID]ChannelLink),
mailOrchestrator: newMailOrchestrator(),
forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}),
interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
pendingPayments: make(map[uint64]*pendingPayment),
htlcPlex: make(chan *plexPacket),
Expand Down Expand Up @@ -1723,11 +1723,11 @@ func (s *Switch) AddLink(link ChannelLink) error {

chanID := link.ChanID()

// First, ensure that this link is not already active in the switch.
// If a link already exists, then remove the prior one so we can
// replace it with this fresh instance.
_, err := s.getLink(chanID)
if err == nil {
return fmt.Errorf("unable to add ChannelLink(%v), already "+
"active", chanID)
s.removeLink(chanID)
}

// Get and attach the mailbox for this link, which buffers packets in
Expand Down Expand Up @@ -1774,9 +1774,9 @@ func (s *Switch) addLiveLink(link ChannelLink) {
// quickly look up all the channels for a particular node.
peerPub := link.Peer().PubKey()
if _, ok := s.interfaceIndex[peerPub]; !ok {
s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{})
s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
}
s.interfaceIndex[peerPub][link] = struct{}{}
s.interfaceIndex[peerPub][link.ChanID()] = link
}

// GetLink is used to initiate the handling of the get link command. The
Expand Down Expand Up @@ -1840,9 +1840,18 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
delete(s.linkIndex, link.ChanID())
delete(s.forwardingIndex, link.ShortChanID())

// Remove the channel from channel index.
// If the link has been added to the peer index, then we'll move to
// delete the entry within the index.
peerPub := link.Peer().PubKey()
delete(s.interfaceIndex, peerPub)
if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
delete(peerIndex, link.ChanID())

// If after deletion, there are no longer any links, then we'll
// remove the interface map all together.
if len(peerIndex) == 0 {
delete(s.interfaceIndex, peerPub)
}
}

link.Stop()

Expand Down Expand Up @@ -1918,7 +1927,7 @@ func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
}

channelLinks := make([]ChannelLink, 0, len(links))
for link := range links {
for _, link := range links {
channelLinks = append(channelLinks, link)
}

Expand Down
59 changes: 0 additions & 59 deletions htlcswitch/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,6 @@ func genPreimage() ([32]byte, error) {
return preimage, nil
}

// TestSwitchAddDuplicateLink tests that the switch will reject duplicate links
// for both pending and live links. It also tests that we can successfully
// add a link after having removed it.
func TestSwitchAddDuplicateLink(t *testing.T) {
t.Parallel()

alicePeer, err := newMockServer(t, "alice", nil)
if err != nil {
t.Fatalf("unable to create alice server: %v", err)
}

s, err := initSwitchWithDB(nil)
if err != nil {
t.Fatalf("unable to init switch: %v", err)
}
if err := s.Start(); err != nil {
t.Fatalf("unable to start switch: %v", err)
}
defer s.Stop()

chanID1, _, aliceChanID, _ := genIDs()

pendingChanID := lnwire.ShortChannelID{}

aliceChannelLink := newMockChannelLink(
s, chanID1, pendingChanID, alicePeer, false,
)
if err := s.AddLink(aliceChannelLink); err != nil {
t.Fatalf("unable to add alice link: %v", err)
}

// Alice should have a pending link, adding again should fail.
if err := s.AddLink(aliceChannelLink); err == nil {
t.Fatalf("adding duplicate link should have failed")
}

// Update the short chan id of the channel, so that the link goes live.
aliceChannelLink.setLiveShortChanID(aliceChanID)
err = s.UpdateShortChanID(chanID1)
if err != nil {
t.Fatalf("unable to update alice short_chan_id: %v", err)
}

// Alice should have a live link, adding again should fail.
if err := s.AddLink(aliceChannelLink); err == nil {
t.Fatalf("adding duplicate link should have failed")
}

// Remove the live link to ensure the indexes are cleared.
if err := s.RemoveLink(chanID1); err != nil {
t.Fatalf("unable to remove alice link: %v", err)
}

// Alice has no links, adding should succeed.
if err := s.AddLink(aliceChannelLink); err != nil {
t.Fatalf("unable to add alice link: %v", err)
}
}

// TestSwitchSendPending checks the inability of htlc switch to forward adds
// over pending links, and the UpdateShortChanID makes a pending link live.
func TestSwitchSendPending(t *testing.T) {
Expand Down
71 changes: 55 additions & 16 deletions lnd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5105,11 +5105,20 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
// broadcasting his current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so he'll soon
// feel the wrath of Alice's retribution.
force := true
closeUpdates, closeTxId, err := net.CloseChannel(ctxb, carol,
chanPoint, force)
var (
closeUpdates lnrpc.Lightning_CloseChannelClient
closeTxId *chainhash.Hash
closeErr error
force bool = true
)
err = lntest.WaitPredicate(func() bool {
closeUpdates, closeTxId, closeErr = net.CloseChannel(
ctxb, carol, chanPoint, force,
)
return closeErr == nil
}, time.Second*15)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
t.Fatalf("unable to close channel: %v", closeErr)
}

// Query the mempool for the breaching closing transaction, this should
Expand Down Expand Up @@ -6787,6 +6796,27 @@ func assertActiveHtlcs(nodes []*lntest.HarnessNode, payHashes ...[]byte) error {
return nil
}

func assertNumActiveHtlcsChanPoint(node *lntest.HarnessNode,
chanPoint wire.OutPoint, numHtlcs int) bool {

req := &lnrpc.ListChannelsRequest{}
ctxb := context.Background()
nodeChans, err := node.ListChannels(ctxb, req)
if err != nil {
return false
}

for _, channel := range nodeChans.Channels {
if channel.ChannelPoint != chanPoint.String() {
continue
}

return len(channel.PendingHtlcs) == numHtlcs
}

return false
}

func assertNumActiveHtlcs(nodes []*lntest.HarnessNode, numHtlcs int) bool {
req := &lnrpc.ListChannelsRequest{}
ctxb := context.Background()
Expand Down Expand Up @@ -9079,11 +9109,11 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("htlc mismatch: %v", err)
}

// Disconnect the two intermediaries, Alice and Dave, so that when carol
// restarts, the response will be held by Dave.
// Disconnect the two intermediaries, Alice and Dave, by shutting down
// Alice.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil {
t.Fatalf("unable to disconnect alice from dave: %v", err)
if err := net.StopNode(net.Alice); err != nil {
t.Fatalf("unable to shutdown alice: %v", err)
}

// Now restart carol without hodl mode, to settle back the outstanding
Expand All @@ -9101,10 +9131,12 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("unable to reconnect dave and carol: %v", err)
}

// Wait for Carol to report no outstanding htlcs.
// Wait for Carol to report no outstanding htlcs, and also for Dav to
// receive all the settles from Carol.
carolNode := []*lntest.HarnessNode{carol}
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(carolNode, 0)
return assertNumActiveHtlcs(carolNode, 0) &&
assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0)
}, time.Second*15)
if err != nil {
t.Fatalf("htlc mismatch: %v", err)
Expand All @@ -9113,7 +9145,10 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
// Finally, restart dave who received the settles, but was unable to
// deliver them to Alice since they were disconnected.
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to reconnect alice to dave: %v", err)
t.Fatalf("unable to restart dave: %v", err)
}
if err = net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart alice: %v", err)
}

// Force Dave and Alice to reconnect before waiting for the htlcs to
Expand All @@ -9124,8 +9159,8 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("unable to reconnect dave and carol: %v", err)
}

// After reconnection succeeds, the settles should be propagated all the
// way back to the sender. All nodes should report no active htlcs.
// After reconnection succeeds, the settles should be propagated all
// the way back to the sender. All nodes should report no active htlcs.
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(nodes, 0)
}, time.Second*15)
Expand Down Expand Up @@ -9410,8 +9445,8 @@ func testSwitchOfflineDeliveryOutgoingOffline(
// Disconnect the two intermediaries, Alice and Dave, so that when carol
// restarts, the response will be held by Dave.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil {
t.Fatalf("unable to disconnect alice from dave: %v", err)
if err := net.StopNode(net.Alice); err != nil {
t.Fatalf("unable to shutdown alice: %v", err)
}

// Now restart carol without hodl mode, to settle back the outstanding
Expand All @@ -9424,7 +9459,8 @@ func testSwitchOfflineDeliveryOutgoingOffline(
// Wait for Carol to report no outstanding htlcs.
carolNode := []*lntest.HarnessNode{carol}
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(carolNode, 0)
return assertNumActiveHtlcs(carolNode, 0) &&
assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0)
}, time.Second*15)
if err != nil {
t.Fatalf("htlc mismatch: %v", err)
Expand All @@ -9451,6 +9487,9 @@ func testSwitchOfflineDeliveryOutgoingOffline(
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to restart dave: %v", err)
}
if err = net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart alice: %v", err)
}

// Ensure that Dave is reconnected to Alice before waiting for the htlcs
// to clear.
Expand Down
7 changes: 7 additions & 0 deletions lntest/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,13 @@ func (n *NetworkHarness) ShutdownNode(node *HarnessNode) error {
return nil
}

// StopNode stops the target node, but doesn't yet clean up its directories.
// This can be used to temporarily bring a node down during a test, to be later
// started up again.
func (n *NetworkHarness) StopNode(node *HarnessNode) error {
return node.stop()
}

// TODO(roasbeef): add a WithChannel higher-order function?
// * python-like context manager w.r.t using a channel within a test
// * possibly adds more funds to the target wallet if the funds are not
Expand Down
3 changes: 1 addition & 2 deletions lnwallet/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ type LightningChannel struct {
cowg sync.WaitGroup
wg sync.WaitGroup

quit chan struct{}
quit chan struct{}
}

// NewLightningChannel creates a new, active payment channel given an
Expand Down Expand Up @@ -3184,7 +3184,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
// revocation, but also initiate a state transition to re-sync
// them.
if !lc.FullySynced() {

commitSig, htlcSigs, err := lc.SignNextCommitment()
switch {

Expand Down
18 changes: 10 additions & 8 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {

chanID := lnwire.NewChanIDFromOutPoint(chanPoint)

p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()

peerLog.Infof("NodeKey(%x) loading ChannelPoint(%v)",
p.PubKey(), chanPoint)

Expand Down Expand Up @@ -415,12 +411,18 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
}

// Create the link and add it to the switch.
err = p.addLink(chanPoint, lnChan, forwardingPolicy, blockEpoch,
chainEvents, currentHeight, true)
err = p.addLink(
chanPoint, lnChan, forwardingPolicy, blockEpoch,
chainEvents, currentHeight, true,
)
if err != nil {
lnChan.Stop()
return err
}

p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
}

return nil
Expand Down Expand Up @@ -1527,8 +1529,8 @@ out:
chainEvents, currentHeight, false)
if err != nil {
peerLog.Errorf("can't register new channel "+
"link(%v) with NodeKey(%x)", chanPoint,
p.PubKey())
"link(%v) with NodeKey(%x): %v", chanPoint,
p.PubKey(), err)
}

close(newChanReq.done)
Expand Down
Loading

0 comments on commit c3191a7

Please sign in to comment.