diff --git a/lnd_test.go b/lnd_test.go index 72787a934ed..e618cf75082 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -25,6 +25,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -36,6 +37,10 @@ import ( "google.golang.org/grpc" ) +var ( + harnessNetParams = &chaincfg.SimNetParams +) + // harnessTest wraps a regular testing.T providing enhanced error detection // and propagation. All error will be augmented with a full stack-trace in // order to aid in debugging. Additionally, any panics caused by active @@ -70,7 +75,9 @@ func (h *harnessTest) Fatalf(format string, a ...interface{}) { // RunTestCase executes a harness test case. Any errors or panics will be // represented as fatal. -func (h *harnessTest) RunTestCase(testCase *testCase, net *networkHarness) { +func (h *harnessTest) RunTestCase(testCase *testCase, + net *lntest.NetworkHarness) { + h.testCase = testCase defer func() { h.testCase = nil @@ -110,7 +117,9 @@ func assertTxInBlock(t *harnessTest, block *wire.MsgBlock, txid *chainhash.Hash) // mineBlocks mine 'num' of blocks and check that blocks are present in // node blockchain. -func mineBlocks(t *harnessTest, net *networkHarness, num uint32) []*wire.MsgBlock { +func mineBlocks(t *harnessTest, net *lntest.NetworkHarness, num uint32, +) []*wire.MsgBlock { + blocks := make([]*wire.MsgBlock, num) blockHashes, err := net.Miner.Node.Generate(num) @@ -135,9 +144,9 @@ func mineBlocks(t *harnessTest, net *networkHarness, num uint32) []*wire.MsgBloc // after the channel is considered open: the funding transaction should be // found within a block, and that Alice can report the status of the new // channel. -func openChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarness, - alice, bob *lightningNode, fundingAmt btcutil.Amount, - pushAmt btcutil.Amount) *lnrpc.ChannelPoint { +func openChannelAndAssert(ctx context.Context, t *harnessTest, + net *lntest.NetworkHarness, alice, bob *lntest.HarnessNode, + fundingAmt btcutil.Amount, pushAmt btcutil.Amount) *lnrpc.ChannelPoint { chanOpenUpdate, err := net.OpenChannel(ctx, alice, bob, fundingAmt, pushAmt) @@ -182,8 +191,9 @@ func openChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarne // via timeout from a base parent. Additionally, once the channel has been // detected as closed, an assertion checks that the transaction is found within // a block. -func closeChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarness, - node *lightningNode, fundingChanPoint *lnrpc.ChannelPoint, force bool) *chainhash.Hash { +func closeChannelAndAssert(ctx context.Context, t *harnessTest, + net *lntest.NetworkHarness, node *lntest.HarnessNode, + fundingChanPoint *lnrpc.ChannelPoint, force bool) *chainhash.Hash { closeUpdates, _, err := net.CloseChannel(ctx, node, fundingChanPoint, force) if err != nil { @@ -234,7 +244,7 @@ func closeChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarn // numOpenChannelsPending sends an RPC request to a node to get a count of the // node's channels that are currently in a pending state (with a broadcast, but // not confirmed funding transaction). -func numOpenChannelsPending(ctxt context.Context, node *lightningNode) (int, error) { +func numOpenChannelsPending(ctxt context.Context, node *lntest.HarnessNode) (int, error) { pendingChansRequest := &lnrpc.PendingChannelRequest{} resp, err := node.PendingChannels(ctxt, pendingChansRequest) if err != nil { @@ -246,7 +256,7 @@ func numOpenChannelsPending(ctxt context.Context, node *lightningNode) (int, err // assertNumOpenChannelsPending asserts that a pair of nodes have the expected // number of pending channels between them. func assertNumOpenChannelsPending(ctxt context.Context, t *harnessTest, - alice, bob *lightningNode, expected int) { + alice, bob *lntest.HarnessNode, expected int) { const nPolls = 10 @@ -257,12 +267,12 @@ func assertNumOpenChannelsPending(ctxt context.Context, t *harnessTest, aliceNumChans, err := numOpenChannelsPending(ctxt, alice) if err != nil { t.Fatalf("error fetching alice's node (%v) pending channels %v", - alice.nodeID, err) + alice.NodeID, err) } bobNumChans, err := numOpenChannelsPending(ctxt, bob) if err != nil { t.Fatalf("error fetching bob's node (%v) pending channels %v", - bob.nodeID, err) + bob.NodeID, err) } isLastIteration := i == nPolls-1 @@ -290,7 +300,7 @@ func assertNumOpenChannelsPending(ctxt context.Context, t *harnessTest, // assertNumConnections asserts number current connections between two peers. func assertNumConnections(ctxt context.Context, t *harnessTest, - alice, bob *lightningNode, expected int) { + alice, bob *lntest.HarnessNode, expected int) { const nPolls = 10 @@ -303,12 +313,12 @@ func assertNumConnections(ctxt context.Context, t *harnessTest, aNumPeers, err := alice.ListPeers(ctxt, &lnrpc.ListPeersRequest{}) if err != nil { t.Fatalf("unable to fetch alice's node (%v) list peers %v", - alice.nodeID, err) + alice.NodeID, err) } bNumPeers, err := bob.ListPeers(ctxt, &lnrpc.ListPeersRequest{}) if err != nil { t.Fatalf("unable to fetch bob's node (%v) list peers %v", - bob.nodeID, err) + bob.NodeID, err) } if len(aNumPeers.Peers) != expected { // Continue polling if this is not the final @@ -401,7 +411,7 @@ func completePaymentRequests(ctx context.Context, client lnrpc.LightningClient, // Bob, then immediately closes the channel after asserting some expected post // conditions. Finally, the chain itself is checked to ensure the closing // transaction was mined. -func testBasicChannelFunding(net *networkHarness, t *harnessTest) { +func testBasicChannelFunding(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -458,7 +468,7 @@ func testBasicChannelFunding(net *networkHarness, t *harnessTest) { // testOpenChannelAfterReorg tests that in the case where we have an open // channel where the funding tx gets reorged out, the channel will no // longer be present in the node's routing table. -func testOpenChannelAfterReorg(net *networkHarness, t *harnessTest) { +func testOpenChannelAfterReorg(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -639,7 +649,7 @@ func testOpenChannelAfterReorg(net *networkHarness, t *harnessTest) { // testDisconnectingTargetPeer performs a test which // disconnects Alice-peer from Bob-peer and then re-connects them again -func testDisconnectingTargetPeer(net *networkHarness, t *harnessTest) { +func testDisconnectingTargetPeer(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -768,7 +778,7 @@ func testDisconnectingTargetPeer(net *networkHarness, t *harnessTest) { // representation of channels if the system is restarted or disconnected. // testFundingPersistence mirrors testBasicChannelFunding, but adds restarts // and checks for the state of channels with unconfirmed funding transactions. -func testChannelFundingPersistence(net *networkHarness, t *harnessTest) { +func testChannelFundingPersistence(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() chanAmt := maxFundingAmount @@ -907,14 +917,14 @@ peersPoll: closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) // Clean up carol's node. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } } // testChannelBalance creates a new channel between Alice and Bob, then // checks channel balance to be equal amount specified while creation of channel. -func testChannelBalance(net *networkHarness, t *harnessTest) { +func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) // Open a channel with 0.16 BTC between Alice and Bob, ensuring the @@ -1069,8 +1079,7 @@ func assertPendingHtlcStageAndMaturity(t *harnessTest, // process. // // TODO(roasbeef): also add an unsettled HTLC before force closing. -func testChannelForceClosure(net *networkHarness, t *harnessTest) { - +func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() const ( timeout = time.Duration(time.Second * 10) @@ -1714,7 +1723,7 @@ func testChannelForceClosure(net *networkHarness, t *harnessTest) { } } -func testSingleHopInvoice(net *networkHarness, t *harnessTest) { +func testSingleHopInvoice(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -1851,7 +1860,7 @@ func testSingleHopInvoice(net *networkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) } -func testListPayments(net *networkHarness, t *harnessTest) { +func testListPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -1986,7 +1995,7 @@ func testListPayments(net *networkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) } -func testMultiHopPayments(net *networkHarness, t *harnessTest) { +func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) { const chanAmt = btcutil.Amount(100000) ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -2115,7 +2124,7 @@ func testMultiHopPayments(net *networkHarness, t *harnessTest) { // creating the seed nodes in the network. const baseFee = 1 - assertAmountPaid := func(node *lightningNode, chanPoint wire.OutPoint, + assertAmountPaid := func(node *lntest.HarnessNode, chanPoint wire.OutPoint, amountSent, amountReceived int64) { channelName := "" @@ -2209,18 +2218,18 @@ func testMultiHopPayments(net *networkHarness, t *harnessTest) { ctxt, _ = context.WithTimeout(ctxb, timeout) closeChannelAndAssert(ctxt, t, net, carol, chanPointCarol, false) - // Finally, shutdown the nodes we created for the duration of the - // tests, only leaving the two seed nodes (Alice and Bob) within our - // test network. - if err := carol.Shutdown(); err != nil { + // Finally, shutdown the nodes we created for the duration of the tests, + // only leaving the two seed nodes (Alice and Bob) within our test + // network. + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } - if err := dave.Shutdown(); err != nil { + if err := net.ShutdownNode(dave); err != nil { t.Fatalf("unable to shutdown dave: %v", err) } } -func testInvoiceSubscriptions(net *networkHarness, t *harnessTest) { +func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) { const chanAmt = btcutil.Amount(500000) ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -2313,7 +2322,7 @@ func testInvoiceSubscriptions(net *networkHarness, t *harnessTest) { } // testBasicChannelCreation test multiple channel opening and closing. -func testBasicChannelCreation(net *networkHarness, t *harnessTest) { +func testBasicChannelCreation(net *lntest.NetworkHarness, t *harnessTest) { const ( numChannels = 2 timeout = time.Duration(time.Second * 5) @@ -2340,7 +2349,7 @@ func testBasicChannelCreation(net *networkHarness, t *harnessTest) { // testMaxPendingChannels checks that error is returned from remote peer if // max pending channel number was exceeded and that '--maxpendingchannels' flag // exists and works properly. -func testMaxPendingChannels(net *networkHarness, t *harnessTest) { +func testMaxPendingChannels(net *lntest.NetworkHarness, t *harnessTest) { maxPendingChannels := defaultMaxPendingChannels + 1 amount := maxFundingAmount @@ -2447,7 +2456,7 @@ func testMaxPendingChannels(net *networkHarness, t *harnessTest) { // Finally, shutdown the node we created for the duration of the tests, // only leaving the two seed nodes (Alice and Bob) within our test // network. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } } @@ -2534,7 +2543,7 @@ func waitForNTxsInMempool(miner *rpcclient.Client, n int, // testRevokedCloseRetributinPostBreachConf tests that Alice is able carry out // retribution in the event that she fails immediately after detecting Bob's // breach txn in the mempool. -func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { +func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() const ( timeout = time.Duration(time.Second * 10) @@ -2630,8 +2639,7 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { // With the temporary file created, copy Bob's current state into the // temporary file we created above. Later after more updates, we'll // restore this state. - bobDbPath := filepath.Join(net.Bob.cfg.DataDir, "simnet/bitcoin/channel.db") - if err := copyFile(bobTempDbFile, bobDbPath); err != nil { + if err := copyFile(bobTempDbFile, net.Bob.DBPath()); err != nil { t.Fatalf("unable to copy database files: %v", err) } @@ -2654,7 +2662,7 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { // state. With this, we essentially force Bob to travel back in time // within the channel's history. if err = net.RestartNode(net.Bob, func() error { - return os.Rename(bobTempDbFile, bobDbPath) + return os.Rename(bobTempDbFile, net.Bob.DBPath()) }); err != nil { t.Fatalf("unable to restart node: %v", err) } @@ -2769,8 +2777,7 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { // testRevokedCloseRetributionZeroValueRemoteOutput tests that Alice is able // carry out retribution in the event that she fails in state where the remote // commitment output has zero-value. -func testRevokedCloseRetributionZeroValueRemoteOutput( - net *networkHarness, +func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -2871,8 +2878,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput( // With the temporary file created, copy Carol's current state into the // temporary file we created above. Later after more updates, we'll // restore this state. - carolDbPath := filepath.Join(carol.cfg.DataDir, "simnet/bitcoin/channel.db") - if err := copyFile(carolTempDbFile, carolDbPath); err != nil { + if err := copyFile(carolTempDbFile, carol.DBPath()); err != nil { t.Fatalf("unable to copy database files: %v", err) } @@ -2893,7 +2899,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput( // state. With this, we essentially force Carol to travel back in time // within the channel's history. if err = net.RestartNode(carol, func() error { - return os.Rename(carolTempDbFile, carolDbPath) + return os.Rename(carolTempDbFile, carol.DBPath()) }); err != nil { t.Fatalf("unable to restart node: %v", err) } @@ -3000,8 +3006,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput( // testRevokedCloseRetributionRemoteHodl tests that Alice properly responds to a // channel breach made by the remote party, specifically in the case that the // remote party breaches before settling extended HTLCs. -func testRevokedCloseRetributionRemoteHodl( - net *networkHarness, +func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -3144,8 +3149,7 @@ func testRevokedCloseRetributionRemoteHodl( // With the temporary file created, copy Carol's current state into the // temporary file we created above. Later after more updates, we'll // restore this state. - carolDbPath := filepath.Join(carol.cfg.DataDir, "simnet/bitcoin/channel.db") - if err := copyFile(carolTempDbFile, carolDbPath); err != nil { + if err := copyFile(carolTempDbFile, carol.DBPath()); err != nil { t.Fatalf("unable to copy database files: %v", err) } @@ -3167,7 +3171,7 @@ func testRevokedCloseRetributionRemoteHodl( // state. With this, we essentially force Carol to travel back in time // within the channel's history. if err = net.RestartNode(carol, func() error { - return os.Rename(carolTempDbFile, carolDbPath) + return os.Rename(carolTempDbFile, carol.DBPath()) }); err != nil { t.Fatalf("unable to restart node: %v", err) } @@ -3296,7 +3300,7 @@ func testRevokedCloseRetributionRemoteHodl( } } -func testHtlcErrorPropagation(net *networkHarness, t *harnessTest) { +func testHtlcErrorPropagation(net *lntest.NetworkHarness, t *harnessTest) { // In this test we wish to exercise the daemon's correct parsing, // handling, and propagation of errors that occur while processing a // multi-hop payment. @@ -3542,7 +3546,7 @@ out: // We'll attempt to complete the original invoice we created with Carol // above, but before we do so, Carol will go offline, resulting in a // failed payment. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } // TODO(roasbeef): mission control @@ -3582,7 +3586,7 @@ out: } } -func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { +func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { const chanAmt = maxFundingAmount timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -3800,7 +3804,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { close(quit) // Finally, shutdown carol as our test has concluded successfully. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } } @@ -3808,7 +3812,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { // testNodeAnnouncement ensures that when a node is started with one or more // external IP addresses specified on the command line, that those addresses // announced to the network and reported in the network graph. -func testNodeAnnouncement(net *networkHarness, t *harnessTest) { +func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() ipAddresses := map[string]bool{ @@ -3878,12 +3882,12 @@ func testNodeAnnouncement(net *networkHarness, t *harnessTest) { ctxt, _ = context.WithTimeout(ctxb, timeout) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false) - if err := dave.Shutdown(); err != nil { + if err := net.ShutdownNode(dave); err != nil { t.Fatalf("unable to shutdown dave: %v", err) } } -func testNodeSignVerify(net *networkHarness, t *harnessTest) { +func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -3949,7 +3953,7 @@ func testNodeSignVerify(net *networkHarness, t *harnessTest) { } // Clean up carol's node. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } @@ -3961,12 +3965,12 @@ func testNodeSignVerify(net *networkHarness, t *harnessTest) { // testAsyncPayments tests the performance of the async payments, and also // checks that balances of both sides can't be become negative under stress // payment strikes. -func testAsyncPayments(net *networkHarness, t *harnessTest) { +func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() // As we'll be querying the channels state frequently we'll // create a closure helper function for the purpose. - getChanInfo := func(node *lightningNode) (*lnrpc.ActiveChannel, error) { + getChanInfo := func(node *lntest.HarnessNode) (*lnrpc.ActiveChannel, error) { req := &lnrpc.ListChannelsRequest{} channelInfo, err := node.ListChannels(ctxb, req) if err != nil { @@ -4138,12 +4142,12 @@ func testAsyncPayments(net *networkHarness, t *harnessTest) { // testBidirectionalAsyncPayments tests that nodes are able to send the // payments to each other in async manner without blocking. -func testBidirectionalAsyncPayments(net *networkHarness, t *harnessTest) { +func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() // As we'll be querying the channels state frequently we'll // create a closure helper function for the purpose. - getChanInfo := func(node *lightningNode) (*lnrpc.ActiveChannel, error) { + getChanInfo := func(node *lntest.HarnessNode) (*lnrpc.ActiveChannel, error) { req := &lnrpc.ListChannelsRequest{} channelInfo, err := node.ListChannels(ctxb, req) if err != nil { @@ -4375,7 +4379,7 @@ func testBidirectionalAsyncPayments(net *networkHarness, t *harnessTest) { type testCase struct { name string - test func(net *networkHarness, t *harnessTest) + test func(net *lntest.NetworkHarness, t *harnessTest) } var testsCases = []*testCase{ @@ -4473,29 +4477,33 @@ var testsCases = []*testCase{ func TestLightningNetworkDaemon(t *testing.T) { ht := newHarnessTest(t) + var lndHarness *lntest.NetworkHarness + + // First create an instance of the btcd's rpctest.Harness. This will be + // used to fund the wallets of the nodes within the test network and to + // drive blockchain related events within the network. Revert the default + // setting of accepting non-standard transactions on simnet to reject them. + // Transactions on the lightning network should always be standard to get + // better guarantees of getting included in to blocks. + args := []string{"--rejectnonstd"} + handlers := &rpcclient.NotificationHandlers{ + OnTxAccepted: func(hash *chainhash.Hash, amt btcutil.Amount) { + lndHarness.OnTxAccepted(hash) + }, + } + btcdHarness, err := rpctest.New(harnessNetParams, handlers, args) + if err != nil { + ht.Fatalf("unable to create mining node: %v", err) + } + defer btcdHarness.TearDown() + // First create the network harness to gain access to its // 'OnTxAccepted' call back. - lndHarness, err := newNetworkHarness() + lndHarness, err = lntest.NewNetworkHarness(btcdHarness) if err != nil { ht.Fatalf("unable to create lightning network harness: %v", err) } - - // Set up teardowns. While it's easier to set up the lnd harness before - // the btcd harness, they should be torn down in reverse order to - // prevent certain types of hangs. - var btcdHarness *rpctest.Harness - defer func() { - if lndHarness != nil { - lndHarness.TearDownAll() - } - if btcdHarness != nil { - btcdHarness.TearDown() - } - }() - - handlers := &rpcclient.NotificationHandlers{ - OnTxAccepted: lndHarness.OnTxAccepted, - } + defer lndHarness.TearDownAll() // Spawn a new goroutine to watch for any fatal errors that any of the // running lnd processes encounter. If an error occurs, then the test @@ -4513,18 +4521,6 @@ func TestLightningNetworkDaemon(t *testing.T) { } }() - // First create an instance of the btcd's rpctest.Harness. This will be - // used to fund the wallets of the nodes within the test network and to - // drive blockchain related events within the network. Revert the default - // setting of accepting non-standard transactions on simnet to reject them. - // Transactions on the lightning network should always be standard to get - // better guarantees of getting included in to blocks. - args := []string{"--rejectnonstd"} - btcdHarness, err = rpctest.New(harnessNetParams, handlers, args) - if err != nil { - ht.Fatalf("unable to create mining node: %v", err) - } - // Turn off the btcd rpc logging, otherwise it will lead to panic. // TODO(andrew.shvv|roasbeef) Remove the hack after re-work the way the log // rotator os work. @@ -4548,10 +4544,7 @@ func TestLightningNetworkDaemon(t *testing.T) { // initialization of the network. args - list of lnd arguments, // example: "--debuglevel=debug" // TODO(roasbeef): create master balanced channel with all the monies? - if err := lndHarness.InitializeSeedNodes(btcdHarness, nil); err != nil { - ht.Fatalf("unable to initialize seed nodes: %v", err) - } - if err = lndHarness.SetUp(); err != nil { + if err = lndHarness.SetUp(nil); err != nil { ht.Fatalf("unable to set up test lightning network: %v", err) } diff --git a/lntest/doc.go b/lntest/doc.go new file mode 100644 index 00000000000..38ef4089346 --- /dev/null +++ b/lntest/doc.go @@ -0,0 +1,10 @@ +/* +Package lntest provides testing utilities for the lnd repository. + +This package contains infrastructure for integration tests that launch full lnd +nodes in a controlled environment and interact with them via RPC. Using a +NetworkHarness, a test can launch multiple lnd nodes, open channels between +them, create defined network topologies, and anything else that is possible with +RPC commands. +*/ +package lntest diff --git a/lntest/harness.go b/lntest/harness.go new file mode 100644 index 00000000000..2631e9fbde5 --- /dev/null +++ b/lntest/harness.go @@ -0,0 +1,838 @@ +package lntest + +import ( + "fmt" + "io/ioutil" + "sync" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" + + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/integration/rpctest" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/txscript" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +// NetworkHarness is an integration testing harness for the lightning network. +// The harness by default is created with two active nodes on the network: +// Alice and Bob. +type NetworkHarness struct { + rpcConfig rpcclient.ConnConfig + netParams *chaincfg.Params + + // Miner is a reference to a running full node that can be used to create + // new blocks on the network. + Miner *rpctest.Harness + + activeNodes map[int]*HarnessNode + + // Alice and Bob are the initial seeder nodes that are automatically + // created to be the initial participants of the test network. + Alice *HarnessNode + Bob *HarnessNode + + seenTxns chan *chainhash.Hash + bitcoinWatchRequests chan *txWatchRequest + + // Channel for transmitting stderr output from failed lightning node + // to main process. + lndErrorChan chan error + + quit chan struct{} + + mtx sync.Mutex +} + +// NewNetworkHarness creates a new network test harness. +// TODO(roasbeef): add option to use golang's build library to a binary of the +// current repo. This'll save developers from having to manually `go install` +// within the repo each time before changes +func NewNetworkHarness(r *rpctest.Harness) (*NetworkHarness, error) { + n := NetworkHarness{ + activeNodes: make(map[int]*HarnessNode), + seenTxns: make(chan *chainhash.Hash), + bitcoinWatchRequests: make(chan *txWatchRequest), + lndErrorChan: make(chan error), + netParams: r.ActiveNet, + Miner: r, + rpcConfig: r.RPCConfig(), + quit: make(chan struct{}), + } + go n.networkWatcher() + return &n, nil +} + +// ProcessErrors returns a channel used for reporting any fatal process errors. +// If any of the active nodes within the harness' test network incur a fatal +// error, that error is sent over this channel. +func (n *NetworkHarness) ProcessErrors() <-chan error { + return n.lndErrorChan +} + +// fakeLogger is a fake grpclog.Logger implementation. This is used to stop +// grpc's logger from printing directly to stdout. +type fakeLogger struct{} + +func (f *fakeLogger) Fatal(args ...interface{}) {} +func (f *fakeLogger) Fatalf(format string, args ...interface{}) {} +func (f *fakeLogger) Fatalln(args ...interface{}) {} +func (f *fakeLogger) Print(args ...interface{}) {} +func (f *fakeLogger) Printf(format string, args ...interface{}) {} +func (f *fakeLogger) Println(args ...interface{}) {} + +// SetUp starts the initial seeder nodes within the test harness. The initial +// node's wallets will be funded wallets with ten 1 BTC outputs each. Finally +// rpc clients capable of communicating with the initial seeder nodes are +// created. Nodes are initialized with the given extra command line flags, which +// should be formatted properly - "--arg=value". +func (n *NetworkHarness) SetUp(lndArgs []string) error { + // Swap out grpc's default logger with out fake logger which drops the + // statements on the floor. + grpclog.SetLogger(&fakeLogger{}) + + // Start the initial seeder nodes within the test network, then connect + // their respective RPC clients. + var wg sync.WaitGroup + errChan := make(chan error, 2) + wg.Add(2) + go func() { + defer wg.Done() + node, err := n.NewNode(lndArgs) + if err != nil { + errChan <- err + return + } + n.Alice = node + }() + go func() { + defer wg.Done() + node, err := n.NewNode(lndArgs) + if err != nil { + errChan <- err + return + } + n.Bob = node + }() + wg.Wait() + select { + case err := <-errChan: + return err + default: + } + + // Load up the wallets of the seeder nodes with 10 outputs of 1 BTC + // each. + ctxb := context.Background() + addrReq := &lnrpc.NewAddressRequest{ + Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH, + } + clients := []lnrpc.LightningClient{n.Alice, n.Bob} + for _, client := range clients { + for i := 0; i < 10; i++ { + resp, err := client.NewAddress(ctxb, addrReq) + if err != nil { + return err + } + addr, err := btcutil.DecodeAddress(resp.Address, n.netParams) + if err != nil { + return err + } + addrScript, err := txscript.PayToAddrScript(addr) + if err != nil { + return err + } + + output := &wire.TxOut{ + PkScript: addrScript, + Value: btcutil.SatoshiPerBitcoin, + } + if _, err := n.Miner.SendOutputs([]*wire.TxOut{output}, 30); err != nil { + return err + } + } + } + + // We generate several blocks in order to give the outputs created + // above a good number of confirmations. + if _, err := n.Miner.Node.Generate(10); err != nil { + return err + } + + // Finally, make a connection between both of the nodes. + if err := n.ConnectNodes(ctxb, n.Alice, n.Bob); err != nil { + return err + } + + // Now block until both wallets have fully synced up. + expectedBalance := int64(btcutil.SatoshiPerBitcoin * 10) + balReq := &lnrpc.WalletBalanceRequest{} + balanceTicker := time.Tick(time.Millisecond * 50) + balanceTimeout := time.After(time.Second * 30) +out: + for { + select { + case <-balanceTicker: + aliceResp, err := n.Alice.WalletBalance(ctxb, balReq) + if err != nil { + return err + } + bobResp, err := n.Bob.WalletBalance(ctxb, balReq) + if err != nil { + return err + } + + if aliceResp.ConfirmedBalance == expectedBalance && + bobResp.ConfirmedBalance == expectedBalance { + break out + } + case <-balanceTimeout: + return fmt.Errorf("balances not synced after deadline") + } + } + + return nil +} + +// TearDownAll tears down all active nodes within the test lightning network. +func (n *NetworkHarness) TearDownAll() error { + for _, node := range n.activeNodes { + if err := n.ShutdownNode(node); err != nil { + return err + } + } + + close(n.lndErrorChan) + close(n.quit) + + return nil +} + +// NewNode fully initializes a returns a new HarnessNode binded to the +// current instance of the network harness. The created node is running, but +// not yet connected to other nodes within the network. +func (n *NetworkHarness) NewNode(extraArgs []string) (*HarnessNode, error) { + node, err := newNode(nodeConfig{ + RPCConfig: &n.rpcConfig, + NetParams: n.netParams, + ExtraArgs: extraArgs, + }) + if err != nil { + return nil, err + } + + // Put node in activeNodes to ensure Shutdown is called even if Start + // returns an error. + n.mtx.Lock() + n.activeNodes[node.NodeID] = node + n.mtx.Unlock() + + if err := node.start(n.lndErrorChan); err != nil { + return nil, err + } + + return node, nil +} + +// ConnectNodes establishes an encrypted+authenticated p2p connection from node +// a towards node b. The function will return a non-nil error if the connection +// was unable to be established. +// +// NOTE: This function may block for up to 15-seconds as it will not return +// until the new connection is detected as being known to both nodes. +func (n *NetworkHarness) ConnectNodes(ctx context.Context, a, b *HarnessNode) error { + bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) + if err != nil { + return err + } + + req := &lnrpc.ConnectPeerRequest{ + Addr: &lnrpc.LightningAddress{ + Pubkey: bobInfo.IdentityPubkey, + Host: b.cfg.P2PAddr(), + }, + } + if _, err := a.ConnectPeer(ctx, req); err != nil { + return err + } + + timeout := time.After(time.Second * 15) + for { + + select { + case <-timeout: + return fmt.Errorf("peers not connected within 15 seconds") + default: + } + + // If node B is seen in the ListPeers response from node A, + // then we can exit early as the connection has been fully + // established. + resp, err := a.ListPeers(ctx, &lnrpc.ListPeersRequest{}) + if err != nil { + return err + } + for _, peer := range resp.Peers { + if peer.PubKey == b.PubKeyStr { + return nil + } + } + } +} + +// DisconnectNodes disconnects node a from node b by sending RPC message +// from a node to b node +func (n *NetworkHarness) DisconnectNodes(ctx context.Context, a, b *HarnessNode) error { + bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) + if err != nil { + return err + } + + req := &lnrpc.DisconnectPeerRequest{ + PubKey: bobInfo.IdentityPubkey, + } + + if _, err := a.DisconnectPeer(ctx, req); err != nil { + return err + } + + return nil +} + +// RestartNode attempts to restart a lightning node by shutting it down +// cleanly, then restarting the process. This function is fully blocking. Upon +// restart, the RPC connection to the node will be re-attempted, continuing iff +// the connection attempt is successful. If the callback parameter is non-nil, +// then the function will be executed after the node shuts down, but *before* +// the process has been started up again. +// +// This method can be useful when testing edge cases such as a node broadcast +// and invalidated prior state, or persistent state recovery, simulating node +// crashes, etc. +func (n *NetworkHarness) RestartNode(node *HarnessNode, callback func() error) error { + if err := node.stop(); err != nil { + return err + } + + if callback != nil { + if err := callback(); err != nil { + return err + } + } + + return node.start(n.lndErrorChan) +} + +// ShutdownNode stops an active lnd process and returns when the process has +// exited and any temporary directories have been cleaned up. +func (n *NetworkHarness) ShutdownNode(node *HarnessNode) error { + if err := node.shutdown(); err != nil { + return err + } + + delete(n.activeNodes, node.NodeID) + return nil +} + +// TODO(roasbeef): add a WithChannel higher-order function? +// * python-like context manager w.r.t using a channel within a test +// * possibly adds more funds to the target wallet if the funds are not +// enough + +// txWatchRequest encapsulates a request to the harness' Bitcoin network +// watcher to dispatch a notification once a transaction with the target txid +// is seen within the test network. +type txWatchRequest struct { + txid chainhash.Hash + eventChan chan struct{} +} + +// bitcoinNetworkWatcher is a goroutine which accepts async notification +// requests for the broadcast of a target transaction, and then dispatches the +// transaction once its seen on the Bitcoin network. +func (n *NetworkHarness) networkWatcher() { + seenTxns := make(map[chainhash.Hash]struct{}) + clients := make(map[chainhash.Hash][]chan struct{}) + + for { + + select { + case <-n.quit: + return + + case req := <-n.bitcoinWatchRequests: + // If we've already seen this transaction, then + // immediately dispatch the request. Otherwise, append + // to the list of clients who are watching for the + // broadcast of this transaction. + if _, ok := seenTxns[req.txid]; ok { + close(req.eventChan) + } else { + clients[req.txid] = append(clients[req.txid], req.eventChan) + } + case txid := <-n.seenTxns: + // Add this txid to our set of "seen" transactions. So + // we're able to dispatch any notifications for this + // txid which arrive *after* it's seen within the + // network. + seenTxns[*txid] = struct{}{} + + // If there isn't a registered notification for this + // transaction then ignore it. + txClients, ok := clients[*txid] + if !ok { + continue + } + + // Otherwise, dispatch the notification to all clients, + // cleaning up the now un-needed state. + for _, client := range txClients { + close(client) + } + delete(clients, *txid) + } + } +} + +// OnTxAccepted is a callback to be called each time a new transaction has been +// broadcast on the network. +func (n *NetworkHarness) OnTxAccepted(hash *chainhash.Hash) { + select { + case n.seenTxns <- hash: + case <-n.quit: + return + } +} + +// WaitForTxBroadcast blocks until the target txid is seen on the network. If +// the transaction isn't seen within the network before the passed timeout, +// then an error is returned. +// TODO(roasbeef): add another method which creates queue of all seen transactions +func (n *NetworkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error { + // Return immediately if harness has been torn down. + select { + case <-n.quit: + return fmt.Errorf("NetworkHarness has been torn down") + default: + } + + eventChan := make(chan struct{}) + + n.bitcoinWatchRequests <- &txWatchRequest{ + txid: txid, + eventChan: eventChan, + } + + select { + case <-eventChan: + return nil + case <-n.quit: + return fmt.Errorf("NetworkHarness has been torn down") + case <-ctx.Done(): + return fmt.Errorf("tx not seen before context timeout") + } +} + +// OpenChannel attempts to open a channel between srcNode and destNode with the +// passed channel funding parameters. If the passed context has a timeout, then +// if the timeout is reached before the channel pending notification is +// received, an error is returned. +func (n *NetworkHarness) OpenChannel(ctx context.Context, + srcNode, destNode *HarnessNode, amt btcutil.Amount, + pushAmt btcutil.Amount) (lnrpc.Lightning_OpenChannelClient, error) { + + // Wait until srcNode and destNode have the latest chain synced. + // Otherwise, we may run into a check within the funding manager that + // prevents any funding workflows from being kicked off if the chain + // isn't yet synced. + if err := srcNode.WaitForBlockchainSync(ctx); err != nil { + return nil, fmt.Errorf("Unable to sync srcNode chain: %v", err) + } + if err := destNode.WaitForBlockchainSync(ctx); err != nil { + return nil, fmt.Errorf("Unable to sync destNode chain: %v", err) + } + + openReq := &lnrpc.OpenChannelRequest{ + NodePubkey: destNode.PubKey[:], + LocalFundingAmount: int64(amt), + PushSat: int64(pushAmt), + } + + respStream, err := srcNode.OpenChannel(ctx, openReq) + if err != nil { + return nil, fmt.Errorf("unable to open channel between "+ + "alice and bob: %v", err) + } + + chanOpen := make(chan struct{}) + errChan := make(chan error) + go func() { + // Consume the "channel pending" update. This waits until the node + // notifies us that the final message in the channel funding workflow + // has been sent to the remote node. + resp, err := respStream.Recv() + if err != nil { + errChan <- err + return + } + if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok { + errChan <- fmt.Errorf("expected channel pending update, "+ + "instead got %v", resp) + return + } + + close(chanOpen) + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before chan pending "+ + "update sent: %v", err) + case err := <-errChan: + return nil, err + case <-chanOpen: + return respStream, nil + } +} + +// OpenPendingChannel attempts to open a channel between srcNode and destNode with the +// passed channel funding parameters. If the passed context has a timeout, then +// if the timeout is reached before the channel pending notification is +// received, an error is returned. +func (n *NetworkHarness) OpenPendingChannel(ctx context.Context, + srcNode, destNode *HarnessNode, amt btcutil.Amount, + pushAmt btcutil.Amount) (*lnrpc.PendingUpdate, error) { + + // Wait until srcNode and destNode have blockchain synced + if err := srcNode.WaitForBlockchainSync(ctx); err != nil { + return nil, fmt.Errorf("Unable to sync srcNode chain: %v", err) + } + if err := destNode.WaitForBlockchainSync(ctx); err != nil { + return nil, fmt.Errorf("Unable to sync destNode chain: %v", err) + } + + openReq := &lnrpc.OpenChannelRequest{ + NodePubkey: destNode.PubKey[:], + LocalFundingAmount: int64(amt), + PushSat: int64(pushAmt), + } + + respStream, err := srcNode.OpenChannel(ctx, openReq) + if err != nil { + return nil, fmt.Errorf("unable to open channel between "+ + "alice and bob: %v", err) + } + + chanPending := make(chan *lnrpc.PendingUpdate) + errChan := make(chan error) + go func() { + // Consume the "channel pending" update. This waits until the node + // notifies us that the final message in the channel funding workflow + // has been sent to the remote node. + resp, err := respStream.Recv() + if err != nil { + errChan <- err + return + } + pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending) + if !ok { + errChan <- fmt.Errorf("expected channel pending update, "+ + "instead got %v", resp) + return + } + + chanPending <- pendingResp.ChanPending + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before chan pending " + + "update sent") + case err := <-errChan: + return nil, err + case pendingChan := <-chanPending: + return pendingChan, nil + } +} + +// WaitForChannelOpen waits for a notification that a channel is open by +// consuming a message from the past open channel stream. If the passed context +// has a timeout, then if the timeout is reached before the channel has been +// opened, then an error is returned. +func (n *NetworkHarness) WaitForChannelOpen(ctx context.Context, + openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { + + errChan := make(chan error) + respChan := make(chan *lnrpc.ChannelPoint) + go func() { + resp, err := openChanStream.Recv() + if err != nil { + errChan <- fmt.Errorf("unable to read rpc resp: %v", err) + return + } + fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + if !ok { + errChan <- fmt.Errorf("expected channel open update, "+ + "instead got %v", resp) + return + } + + respChan <- fundingResp.ChanOpen.ChannelPoint + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached while waiting for " + + "channel open") + case err := <-errChan: + return nil, err + case chanPoint := <-respChan: + return chanPoint, nil + } +} + +// CloseChannel close channel attempts to close the channel indicated by the +// passed channel point, initiated by the passed lnNode. If the passed context +// has a timeout, then if the timeout is reached before the channel close is +// pending, then an error is returned. +func (n *NetworkHarness) CloseChannel(ctx context.Context, + lnNode *HarnessNode, cp *lnrpc.ChannelPoint, + force bool) (lnrpc.Lightning_CloseChannelClient, *chainhash.Hash, error) { + + // Create a channel outpoint that we can use to compare to channels + // from the ListChannelsResponse. + fundingTxID, err := chainhash.NewHash(cp.FundingTxid) + if err != nil { + return nil, nil, err + } + chanPoint := wire.OutPoint{ + Hash: *fundingTxID, + Index: cp.OutputIndex, + } + + // If we are not force closing the channel, wait for channel to become + // active before attempting to close it. + numTries := 10 +CheckActive: + for i := 0; !force && i < numTries; i++ { + listReq := &lnrpc.ListChannelsRequest{} + listResp, err := lnNode.ListChannels(ctx, listReq) + if err != nil { + return nil, nil, fmt.Errorf("unable fetch node's "+ + "channels: %v", err) + } + + for _, c := range listResp.Channels { + if c.ChannelPoint == chanPoint.String() && c.Active { + break CheckActive + } + } + + if i == numTries-1 { + // Last iteration, and channel is still not active. + return nil, nil, fmt.Errorf("channel did not become " + + "active") + } + + // Sleep, and try again. + time.Sleep(300 * time.Millisecond) + } + + closeReq := &lnrpc.CloseChannelRequest{ + ChannelPoint: cp, + Force: force, + } + closeRespStream, err := lnNode.CloseChannel(ctx, closeReq) + if err != nil { + return nil, nil, fmt.Errorf("unable to close channel: %v", err) + } + + errChan := make(chan error) + fin := make(chan *chainhash.Hash) + go func() { + // Consume the "channel close" update in order to wait for the closing + // transaction to be broadcast, then wait for the closing tx to be seen + // within the network. + closeResp, err := closeRespStream.Recv() + if err != nil { + errChan <- err + return + } + pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending) + if !ok { + errChan <- fmt.Errorf("expected channel close update, "+ + "instead got %v", pendingClose) + return + } + + closeTxid, err := chainhash.NewHash(pendingClose.ClosePending.Txid) + if err != nil { + errChan <- err + return + } + if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil { + errChan <- err + return + } + fin <- closeTxid + }() + + // Wait until either the deadline for the context expires, an error + // occurs, or the channel close update is received. + select { + case <-ctx.Done(): + return nil, nil, fmt.Errorf("timeout reached before channel close " + + "initiated") + case err := <-errChan: + return nil, nil, err + case closeTxid := <-fin: + return closeRespStream, closeTxid, nil + } +} + +// WaitForChannelClose waits for a notification from the passed channel close +// stream that the node has deemed the channel has been fully closed. If the +// passed context has a timeout, then if the timeout is reached before the +// notification is received then an error is returned. +func (n *NetworkHarness) WaitForChannelClose(ctx context.Context, + closeChanStream lnrpc.Lightning_CloseChannelClient) (*chainhash.Hash, error) { + + errChan := make(chan error) + updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose) + go func() { + closeResp, err := closeChanStream.Recv() + if err != nil { + errChan <- err + return + } + + closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose) + if !ok { + errChan <- fmt.Errorf("expected channel close update, "+ + "instead got %v", closeFin) + return + } + + updateChan <- closeFin + }() + + // Wait until either the deadline for the context expires, an error + // occurs, or the channel close update is received. + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before update sent") + case err := <-errChan: + return nil, err + case update := <-updateChan: + return chainhash.NewHash(update.ChanClose.ClosingTxid) + } +} + +// AssertChannelExists asserts that an active channel identified by +// channelPoint is known to exist from the point-of-view of node.. +func (n *NetworkHarness) AssertChannelExists(ctx context.Context, + node *HarnessNode, chanPoint *wire.OutPoint) error { + + req := &lnrpc.ListChannelsRequest{} + resp, err := node.ListChannels(ctx, req) + if err != nil { + return fmt.Errorf("unable fetch node's channels: %v", err) + } + + for _, channel := range resp.Channels { + if channel.ChannelPoint == chanPoint.String() { + return nil + } + } + + return fmt.Errorf("channel not found") +} + +// DumpLogs reads the current logs generated by the passed node, and returns +// the logs as a single string. This function is useful for examining the logs +// of a particular node in the case of a test failure. +// Logs from lightning node being generated with delay - you should +// add time.Sleep() in order to get all logs. +func (n *NetworkHarness) DumpLogs(node *HarnessNode) (string, error) { + logFile := fmt.Sprintf("%v/simnet/lnd.log", node.cfg.LogDir) + + buf, err := ioutil.ReadFile(logFile) + if err != nil { + return "", err + } + + return string(buf), nil +} + +// SendCoins attempts to send amt satoshis from the internal mining node to the +// targeted lightning node. +func (n *NetworkHarness) SendCoins(ctx context.Context, amt btcutil.Amount, + target *HarnessNode) error { + + balReq := &lnrpc.WalletBalanceRequest{} + initialBalance, err := target.WalletBalance(ctx, balReq) + if err != nil { + return err + } + + // First, obtain an address from the target lightning node, preferring + // to receive a p2wkh address s.t the output can immediately be used as + // an input to a funding transaction. + addrReq := &lnrpc.NewAddressRequest{ + Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH, + } + resp, err := target.NewAddress(ctx, addrReq) + if err != nil { + return err + } + addr, err := btcutil.DecodeAddress(resp.Address, n.netParams) + if err != nil { + return err + } + addrScript, err := txscript.PayToAddrScript(addr) + if err != nil { + return err + } + + // Generate a transaction which creates an output to the target + // pkScript of the desired amount. + output := &wire.TxOut{ + PkScript: addrScript, + Value: int64(amt), + } + if _, err := n.Miner.SendOutputs([]*wire.TxOut{output}, 30); err != nil { + return err + } + + // Finally, generate 6 new blocks to ensure the output gains a + // sufficient number of confirmations. + if _, err := n.Miner.Node.Generate(6); err != nil { + return err + } + + // Pause until the nodes current wallet balances reflects the amount + // sent to it above. + // TODO(roasbeef): factor out into helper func + balanceTicker := time.Tick(time.Millisecond * 50) + balanceTimeout := time.After(time.Second * 30) + for { + select { + case <-balanceTicker: + currentBal, err := target.WalletBalance(ctx, balReq) + if err != nil { + return err + } + + if currentBal.ConfirmedBalance == initialBalance.ConfirmedBalance+int64(amt) { + return nil + } + case <-balanceTimeout: + return fmt.Errorf("balances not synced after deadline") + } + } +} diff --git a/lntest/node.go b/lntest/node.go new file mode 100644 index 00000000000..9242835cda3 --- /dev/null +++ b/lntest/node.go @@ -0,0 +1,681 @@ +package lntest + +import ( + "bytes" + "encoding/hex" + "flag" + "fmt" + "io" + "io/ioutil" + "net" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + macaroon "gopkg.in/macaroon.v1" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/macaroons" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/wire" +) + +var ( + // numActiveNodes is the number of active nodes within the test network. + numActiveNodes = 0 + + // defaultNodePort is the initial p2p port which will be used by the + // first created lightning node to listen on for incoming p2p + // connections. Subsequent allocated ports for future lighting nodes + // instances will be monotonically increasing odd numbers calculated as + // such: defaultP2pPort + (2 * harness.nodeNum). + defaultNodePort = 19555 + + // defaultClientPort is the initial rpc port which will be used by the + // first created lightning node to listen on for incoming rpc + // connections. Subsequent allocated ports for future rpc harness + // instances will be monotonically increasing even numbers calculated + // as such: defaultP2pPort + (2 * harness.nodeNum). + defaultClientPort = 19556 + + // logOutput is a flag that can be set to append the output from the + // seed nodes to log files. + logOutput = flag.Bool("logoutput", false, + "log output from node n to file outputn.log") + + // trickleDelay is the amount of time in milliseconds between each + // release of announcements by AuthenticatedGossiper to the network. + trickleDelay = 50 +) + +// generateListeningPorts returns two strings representing ports to listen on +// designated for the current lightning network test. If there haven't been any +// test instances created, the default ports are used. Otherwise, in order to +// support multiple test nodes running at once, the p2p and rpc port are +// incremented after each initialization. +func generateListeningPorts() (int, int) { + var p2p, rpc int + if numActiveNodes == 0 { + p2p = defaultNodePort + rpc = defaultClientPort + } else { + p2p = defaultNodePort + (2 * numActiveNodes) + rpc = defaultClientPort + (2 * numActiveNodes) + } + + return p2p, rpc +} + +type nodeConfig struct { + RPCConfig *rpcclient.ConnConfig + NetParams *chaincfg.Params + BaseDir string + ExtraArgs []string + + DataDir string + LogDir string + TLSCertPath string + TLSKeyPath string + AdminMacPath string + ReadMacPath string + P2PPort int + RPCPort int +} + +func (cfg nodeConfig) P2PAddr() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.P2PPort)) +} + +func (cfg nodeConfig) RPCAddr() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)) +} + +func (cfg nodeConfig) DBPath() string { + return filepath.Join(cfg.DataDir, cfg.NetParams.Name, "bitcoin/channel.db") +} + +// genArgs generates a slice of command line arguments from the lightning node +// config struct. +func (cfg nodeConfig) genArgs() []string { + var args []string + + switch cfg.NetParams { + case &chaincfg.TestNet3Params: + args = append(args, "--bitcoin.testnet") + case &chaincfg.SimNetParams: + args = append(args, "--bitcoin.simnet") + case &chaincfg.RegressionNetParams: + args = append(args, "--bitcoin.regtest") + } + + encodedCert := hex.EncodeToString(cfg.RPCConfig.Certificates) + args = append(args, "--bitcoin.active") + args = append(args, "--nobootstrap") + args = append(args, "--noencryptwallet") + args = append(args, "--debuglevel=debug") + args = append(args, "--defaultchanconfs=1") + args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", cfg.RPCConfig.Host)) + args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", cfg.RPCConfig.User)) + args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", cfg.RPCConfig.Pass)) + args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) + args = append(args, fmt.Sprintf("--rpcport=%v", cfg.RPCPort)) + args = append(args, fmt.Sprintf("--peerport=%v", cfg.P2PPort)) + args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir)) + args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir)) + args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath)) + args = append(args, fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath)) + args = append(args, fmt.Sprintf("--configfile=%v", cfg.DataDir)) + args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath)) + args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath)) + args = append(args, fmt.Sprintf("--externalip=%s", cfg.P2PAddr())) + args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) + + if cfg.ExtraArgs != nil { + args = append(args, cfg.ExtraArgs...) + } + + return args +} + +// HarnessNode represents an instance of lnd running within our test network +// harness. Each HarnessNode instance also fully embedds an RPC client in +// order to pragmatically drive the node. +type HarnessNode struct { + cfg *nodeConfig + + // NodeID is a unique identifier for the node within a NetworkHarness. + NodeID int + + // PubKey is the serialized compressed identity public key of the node. + // This field will only be populated once the node itself has been + // started via the start() method. + PubKey [33]byte + PubKeyStr string + + cmd *exec.Cmd + pidFile string + + // processExit is a channel that's closed once it's detected that the + // process this instance of HarnessNode is bound to has exited. + processExit chan struct{} + + chanWatchRequests chan *chanWatchRequest + + quit chan struct{} + wg sync.WaitGroup + + lnrpc.LightningClient +} + +// Assert *HarnessNode implements the lnrpc.LightningClient interface. +var _ lnrpc.LightningClient = (*HarnessNode)(nil) + +// newNode creates a new test lightning node instance from the passed config. +func newNode(cfg nodeConfig) (*HarnessNode, error) { + if cfg.BaseDir == "" { + var err error + cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node") + if err != nil { + return nil, err + } + } + cfg.DataDir = filepath.Join(cfg.BaseDir, "data") + cfg.LogDir = filepath.Join(cfg.BaseDir, "log") + cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") + cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") + cfg.AdminMacPath = filepath.Join(cfg.DataDir, "admin.macaroon") + cfg.ReadMacPath = filepath.Join(cfg.DataDir, "readonly.macaroon") + + cfg.P2PPort, cfg.RPCPort = generateListeningPorts() + + nodeNum := numActiveNodes + numActiveNodes++ + + return &HarnessNode{ + cfg: &cfg, + NodeID: nodeNum, + chanWatchRequests: make(chan *chanWatchRequest), + }, nil +} + +// DBPath returns the filepath to the channeldb database file for this node. +func (hn *HarnessNode) DBPath() string { + return hn.cfg.DBPath() +} + +// Start launches a new process running lnd. Additionally, the PID of the +// launched process is saved in order to possibly kill the process forcibly +// later. +// +// This may not clean up properly if an error is returned, so the caller should +// call shutdown() regardless of the return value. +func (hn *HarnessNode) start(lndError chan<- error) error { + hn.quit = make(chan struct{}) + + args := hn.cfg.genArgs() + hn.cmd = exec.Command("lnd", args...) + + // Redirect stderr output to buffer + var errb bytes.Buffer + hn.cmd.Stderr = &errb + + // If the logoutput flag is passed, redirect output from the nodes to + // log files. + if *logOutput { + logFile := fmt.Sprintf("output%d.log", hn.NodeID) + + // Create file if not exists, otherwise append. + file, err := os.OpenFile(logFile, + os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + return err + } + + // Pass node's stderr to both errb and the file. + w := io.MultiWriter(&errb, file) + hn.cmd.Stderr = w + + // Pass the node's stdout only to the file. + hn.cmd.Stdout = file + } + + if err := hn.cmd.Start(); err != nil { + return err + } + + // Launch a new goroutine which that bubbles up any potential fatal + // process errors to the goroutine running the tests. + hn.processExit = make(chan struct{}) + go func() { + err := hn.cmd.Wait() + + if err != nil { + lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) + } + + // Signal any onlookers that this process has exited. + close(hn.processExit) + }() + + // Write process ID to a file. + if err := hn.writePidFile(); err != nil { + hn.cmd.Process.Kill() + return err + } + + // Since Stop uses the LightningClient to stop the node, if we fail to get a + // connected client, we have to kill the process. + conn, err := hn.connectRPC() + if err != nil { + hn.cmd.Process.Kill() + return err + } + hn.LightningClient = lnrpc.NewLightningClient(conn) + + // Obtain the lnid of this node for quick identification purposes. + ctxb := context.Background() + info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) + if err != nil { + return err + } + + hn.PubKeyStr = info.IdentityPubkey + + pubkey, err := hex.DecodeString(info.IdentityPubkey) + if err != nil { + return err + } + copy(hn.PubKey[:], pubkey) + + // Launch the watcher that'll hook into graph related topology change + // from the PoV of this node. + hn.wg.Add(1) + go hn.lightningNetworkWatcher() + + return nil +} + +// writePidFile writes the process ID of the running lnd process to a .pid file. +func (hn *HarnessNode) writePidFile() error { + filePath := filepath.Join(hn.cfg.BaseDir, fmt.Sprintf("%v.pid", hn.NodeID)) + + pid, err := os.Create(filePath) + if err != nil { + return err + } + defer pid.Close() + + _, err = fmt.Fprintf(pid, "%v\n", hn.cmd.Process.Pid) + if err != nil { + return err + } + + hn.pidFile = filePath + return nil +} + +// connectRPC uses the TLS certificate and admin macaroon files written by the +// lnd node to create a gRPC client connection. +func (hn *HarnessNode) connectRPC() (*grpc.ClientConn, error) { + // Wait until TLS certificate and admin macaroon are created before + // using them, up to 20 sec. + tlsTimeout := time.After(30 * time.Second) + for !fileExists(hn.cfg.TLSCertPath) || !fileExists(hn.cfg.AdminMacPath) { + select { + case <-tlsTimeout: + return nil, fmt.Errorf("timeout waiting for TLS cert file " + + "and admin macaroon file to be created after " + + "20 seconds") + case <-time.After(100 * time.Millisecond): + } + } + + tlsCreds, err := credentials.NewClientTLSFromFile(hn.cfg.TLSCertPath, "") + if err != nil { + return nil, err + } + macBytes, err := ioutil.ReadFile(hn.cfg.AdminMacPath) + if err != nil { + return nil, err + } + mac := &macaroon.Macaroon{} + if err = mac.UnmarshalBinary(macBytes); err != nil { + return nil, err + } + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(tlsCreds), + grpc.WithPerRPCCredentials(macaroons.NewMacaroonCredential(mac)), + grpc.WithBlock(), + grpc.WithTimeout(time.Second * 20), + } + return grpc.Dial(hn.cfg.RPCAddr(), opts...) +} + +// cleanup cleans up all the temporary files created by the node's process. +func (hn *HarnessNode) cleanup() error { + return os.RemoveAll(hn.cfg.BaseDir) +} + +// Stop attempts to stop the active lnd process. +func (hn *HarnessNode) stop() error { + // Do nothing if the process is not running. + if hn.processExit == nil { + return nil + } + + // If start() failed before creating a client, we will just wait for the + // child process to die. + if hn.LightningClient != nil { + // Don't watch for error because sometimes the RPC connection gets + // closed before a response is returned. + req := lnrpc.StopRequest{} + ctx := context.Background() + hn.LightningClient.StopDaemon(ctx, &req) + } + + // Wait for lnd process and other goroutines to exit. + <-hn.processExit + close(hn.quit) + hn.wg.Wait() + + hn.quit = nil + hn.processExit = nil + hn.LightningClient = nil + return nil +} + +// shutdown stops the active lnd process and cleans up any temporary directories +// created along the way. +func (hn *HarnessNode) shutdown() error { + if err := hn.stop(); err != nil { + return err + } + if err := hn.cleanup(); err != nil { + return err + } + return nil +} + +// closeChanWatchRequest is a request to the lightningNetworkWatcher to be +// notified once it's detected within the test Lightning Network, that a +// channel has either been added or closed. +type chanWatchRequest struct { + chanPoint wire.OutPoint + + chanOpen bool + + eventChan chan struct{} +} + +// lightningNetworkWatcher is a goroutine which is able to dispatch +// notifications once it has been observed that a target channel has been +// closed or opened within the network. In order to dispatch these +// notifications, the GraphTopologySubscription client exposed as part of the +// gRPC interface is used. +func (hn *HarnessNode) lightningNetworkWatcher() { + defer hn.wg.Done() + + graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) + hn.wg.Add(1) + go func() { + defer hn.wg.Done() + + ctxb := context.Background() + req := &lnrpc.GraphTopologySubscription{} + topologyClient, err := hn.SubscribeChannelGraph(ctxb, req) + if err != nil { + // We panic here in case of an error as failure to + // create the topology client will cause all subsequent + // tests to fail. + panic(fmt.Errorf("unable to create topology "+ + "client: %v", err)) + } + + for { + update, err := topologyClient.Recv() + if err == io.EOF { + return + } else if err != nil { + return + } + + select { + case graphUpdates <- update: + case <-hn.quit: + return + } + } + }() + + // For each outpoint, we'll track an integer which denotes the number + // of edges seen for that channel within the network. When this number + // reaches 2, then it means that both edge advertisements has + // propagated through the network. + openChans := make(map[wire.OutPoint]int) + openClients := make(map[wire.OutPoint][]chan struct{}) + + closedChans := make(map[wire.OutPoint]struct{}) + closeClients := make(map[wire.OutPoint][]chan struct{}) + + for { + select { + + // A new graph update has just been received, so we'll examine + // the current set of registered clients to see if we can + // dispatch any requests. + case graphUpdate := <-graphUpdates: + // For each new channel, we'll increment the number of + // edges seen by one. + for _, newChan := range graphUpdate.ChannelUpdates { + txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: newChan.ChanPoint.OutputIndex, + } + openChans[op]++ + + // For this new channel, if the number of edges + // seen is less than two, then the channel + // hasn't been fully announced yet. + if numEdges := openChans[op]; numEdges < 2 { + continue + } + + // Otherwise, we'll notify all the registered + // clients and remove the dispatched clients. + for _, eventChan := range openClients[op] { + close(eventChan) + } + delete(openClients, op) + } + + // For each channel closed, we'll mark that we've + // detected a channel closure while lnd was pruning the + // channel graph. + for _, closedChan := range graphUpdate.ClosedChans { + txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: closedChan.ChanPoint.OutputIndex, + } + closedChans[op] = struct{}{} + + // As the channel has been closed, we'll notify + // all register clients. + for _, eventChan := range closeClients[op] { + close(eventChan) + } + delete(closeClients, op) + } + + // A new watch request, has just arrived. We'll either be able + // to dispatch immediately, or need to add the client for + // processing later. + case watchRequest := <-hn.chanWatchRequests: + targetChan := watchRequest.chanPoint + + // TODO(roasbeef): add update type also, checks for + // multiple of 2 + if watchRequest.chanOpen { + // If this is a open request, then it can be + // dispatched if the number of edges seen for + // the channel is at least two. + if numEdges := openChans[targetChan]; numEdges >= 2 { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of + // watch open clients for this out point. + openClients[targetChan] = append(openClients[targetChan], + watchRequest.eventChan) + continue + } + + // If this is a close request, then it can be + // immediately dispatched if we've already seen a + // channel closure for this channel. + if _, ok := closedChans[targetChan]; ok { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of close watch + // clients for this out point. + closeClients[targetChan] = append(closeClients[targetChan], + watchRequest.eventChan) + + case <-hn.quit: + return + } + } +} + +// WaitForNetworkChannelOpen will block until a channel with the target +// outpoint is seen as being fully advertised within the network. A channel is +// considered "fully advertised" once both of its directional edges has been +// advertised within the test Lightning Network. +func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + hn.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: true, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not opened before timeout") + } +} + +// WaitForNetworkChannelClose will block until a channel with the target +// outpoint is seen as closed within the network. A channel is considered +// closed once a transaction spending the funding outpoint is seen within a +// confirmed block. +func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + hn.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: false, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not closed before timeout") + } +} + +// WaitForBlockchainSync will block until the target nodes has fully +// synchronized with the blockchain. If the passed context object has a set +// timeout, then the goroutine will continually poll until the timeout has +// elapsed. In the case that the chain isn't synced before the timeout is up, +// then this function will return an error. +func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error { + errChan := make(chan error, 1) + retryDelay := time.Millisecond * 100 + + go func() { + for { + select { + case <-ctx.Done(): + case <-hn.quit: + return + default: + } + + getInfoReq := &lnrpc.GetInfoRequest{} + getInfoResp, err := hn.GetInfo(ctx, getInfoReq) + if err != nil { + errChan <- err + return + } + if getInfoResp.SyncedToChain { + errChan <- nil + return + } + + select { + case <-ctx.Done(): + return + case <-time.After(retryDelay): + } + } + }() + + select { + case <-hn.quit: + return nil + case err := <-errChan: + return err + case <-ctx.Done(): + return fmt.Errorf("Timeout while waiting for blockchain sync") + } +} + +// fileExists reports whether the named file or directory exists. +// This function is taken from https://github.com/btcsuite/btcd +func fileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} diff --git a/networktest.go b/networktest.go deleted file mode 100644 index be9be8d9c17..00000000000 --- a/networktest.go +++ /dev/null @@ -1,1505 +0,0 @@ -package main - -import ( - "bytes" - "encoding/hex" - "flag" - "fmt" - "io" - "io/ioutil" - "log" - "net" - "os" - "path/filepath" - "strconv" - "sync" - "time" - - macaroon "gopkg.in/macaroon.v1" - - "golang.org/x/net/context" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/grpclog" - - "os/exec" - - "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/macaroons" - "github.com/roasbeef/btcd/chaincfg" - "github.com/roasbeef/btcd/chaincfg/chainhash" - "github.com/roasbeef/btcd/integration/rpctest" - "github.com/roasbeef/btcd/rpcclient" - "github.com/roasbeef/btcd/txscript" - "github.com/roasbeef/btcd/wire" - "github.com/roasbeef/btcutil" -) - -var ( - // numActiveNodes is the number of active nodes within the test network. - numActiveNodes = 0 - - // defaultNodePort is the initial p2p port which will be used by the - // first created lightning node to listen on for incoming p2p - // connections. Subsequent allocated ports for future lighting nodes - // instances will be monotonically increasing odd numbers calculated as - // such: defaultP2pPort + (2 * harness.nodeNum). - defaultNodePort = 19555 - - // defaultClientPort is the initial rpc port which will be used by the - // first created lightning node to listen on for incoming rpc - // connections. Subsequent allocated ports for future rpc harness - // instances will be monotonically increasing even numbers calculated - // as such: defaultP2pPort + (2 * harness.nodeNum). - defaultClientPort = 19556 - - harnessNetParams = &chaincfg.SimNetParams - - // logOutput is a flag that can be set to append the output from the - // seed nodes to log files. - logOutput = flag.Bool("logoutput", false, - "log output from node n to file outputn.log") - - // trickleDelay is the amount of time in milliseconds between each - // release of announcements by AuthenticatedGossiper to the network. - trickleDelay = 50 -) - -// generateListeningPorts returns two strings representing ports to listen on -// designated for the current lightning network test. If there haven't been any -// test instances created, the default ports are used. Otherwise, in order to -// support multiple test nodes running at once, the p2p and rpc port are -// incremented after each initialization. -func generateListeningPorts() (int, int) { - var p2p, rpc int - if numActiveNodes == 0 { - p2p = defaultNodePort - rpc = defaultClientPort - } else { - p2p = defaultNodePort + (2 * numActiveNodes) - rpc = defaultClientPort + (2 * numActiveNodes) - } - - return p2p, rpc -} - -// lightningNode represents an instance of lnd running within our test network -// harness. Each lightningNode instance also fully embedds an RPC client in -// order to pragmatically drive the node. -type lightningNode struct { - cfg *config - - rpcAddr string - p2pAddr string - rpcCert []byte - - nodeID int - - // PubKey is the serialized compressed identity public key of the node. - // This field will only be populated once the node itself has been - // started via the start() method. - PubKey [33]byte - PubKeyStr string - - cmd *exec.Cmd - pidFile string - - // processExit is a channel that's closed once it's detected that the - // process this instance of lightningNode is bound to has exited. - processExit chan struct{} - - extraArgs []string - - chanWatchRequests chan *chanWatchRequest - - quit chan struct{} - wg sync.WaitGroup - - lnrpc.LightningClient -} - -// newLightningNode creates a new test lightning node instance from the passed -// rpc config and slice of extra arguments. -func newLightningNode(btcrpcConfig *rpcclient.ConnConfig, lndArgs []string) (*lightningNode, error) { - var err error - - cfg := &config{ - Bitcoin: &chainConfig{ - RPCHost: btcrpcConfig.Host, - RPCUser: btcrpcConfig.User, - RPCPass: btcrpcConfig.Pass, - }, - } - - nodeNum := numActiveNodes - numActiveNodes++ - - cfg.DataDir, err = ioutil.TempDir("", "lndtest-data") - if err != nil { - return nil, err - } - cfg.LogDir, err = ioutil.TempDir("", "lndtest-log") - if err != nil { - return nil, err - } - cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") - cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") - cfg.AdminMacPath = filepath.Join(cfg.DataDir, "admin.macaroon") - cfg.ReadMacPath = filepath.Join(cfg.DataDir, "readonly.macaroon") - - cfg.PeerPort, cfg.RPCPort = generateListeningPorts() - - lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+ - strconv.Itoa(cfg.PeerPort)) - lndArgs = append(lndArgs, "--noencryptwallet") - - return &lightningNode{ - cfg: cfg, - p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), - rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), - rpcCert: btcrpcConfig.Certificates, - nodeID: nodeNum, - chanWatchRequests: make(chan *chanWatchRequest), - processExit: make(chan struct{}), - quit: make(chan struct{}), - extraArgs: lndArgs, - }, nil -} - -// genArgs generates a slice of command line arguments from the lightningNode's -// current config struct. -func (l *lightningNode) genArgs() []string { - var args []string - - encodedCert := hex.EncodeToString(l.rpcCert) - args = append(args, "--bitcoin.active") - args = append(args, "--bitcoin.simnet") - args = append(args, "--nobootstrap") - args = append(args, "--debuglevel=debug") - args = append(args, "--defaultchanconfs=1") - args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", l.cfg.Bitcoin.RPCHost)) - args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", l.cfg.Bitcoin.RPCUser)) - args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", l.cfg.Bitcoin.RPCPass)) - args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) - args = append(args, fmt.Sprintf("--rpcport=%v", l.cfg.RPCPort)) - args = append(args, fmt.Sprintf("--peerport=%v", l.cfg.PeerPort)) - args = append(args, fmt.Sprintf("--logdir=%v", l.cfg.LogDir)) - args = append(args, fmt.Sprintf("--datadir=%v", l.cfg.DataDir)) - args = append(args, fmt.Sprintf("--tlscertpath=%v", l.cfg.TLSCertPath)) - args = append(args, fmt.Sprintf("--tlskeypath=%v", l.cfg.TLSKeyPath)) - args = append(args, fmt.Sprintf("--configfile=%v", l.cfg.DataDir)) - args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", l.cfg.AdminMacPath)) - args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", l.cfg.ReadMacPath)) - args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) - - if l.extraArgs != nil { - args = append(args, l.extraArgs...) - } - - return args -} - -// Start launches a new process running lnd. Additionally, the PID of the -// launched process is saved in order to possibly kill the process forcibly -// later. -func (l *lightningNode) Start(lndError chan<- error) error { - args := l.genArgs() - - l.cmd = exec.Command("lnd", args...) - - // Redirect stderr output to buffer - var errb bytes.Buffer - l.cmd.Stderr = &errb - - // If the logoutput flag is passed, redirect output from the nodes to - // log files. - if *logOutput { - logFile := fmt.Sprintf("output%d.log", l.nodeID) - - // Create file if not exists, otherwise append. - file, err := os.OpenFile(logFile, - os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - if err != nil { - return err - } - - // Pass node's stderr to both errb and the file. - w := io.MultiWriter(&errb, file) - l.cmd.Stderr = w - - // Pass the node's stdout only to the file. - l.cmd.Stdout = file - } - - if err := l.cmd.Start(); err != nil { - return err - } - - // Launch a new goroutine which that bubbles up any potential fatal - // process errors to the goroutine running the tests. - go func() { - err := l.cmd.Wait() - if err != nil { - lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) - } - - // Signal any onlookers that this process has exited. - close(l.processExit) - }() - - // Write process ID to a file. - if err := l.writePidFile(); err != nil { - l.cmd.Process.Kill() - return err - } - - // Since Stop uses the LightningClient to stop the node, if we fail to get a - // connected client, we have to kill the process. - conn, err := l.connectRPC() - if err != nil { - l.cmd.Process.Kill() - return err - } - l.LightningClient = lnrpc.NewLightningClient(conn) - - // Obtain the lnid of this node for quick identification purposes. - ctxb := context.Background() - info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) - if err != nil { - return err - } - - l.PubKeyStr = info.IdentityPubkey - - pubkey, err := hex.DecodeString(info.IdentityPubkey) - if err != nil { - return err - } - copy(l.PubKey[:], pubkey) - - // Launch the watcher that'll hook into graph related topology change - // from the PoV of this node. - l.wg.Add(1) - go l.lightningNetworkWatcher() - - return nil -} - -// writePidFile writes the process ID of the running lnd process to a .pid file. -func (l *lightningNode) writePidFile() error { - filePath := filepath.Join(l.cfg.DataDir, fmt.Sprintf("%v.pid", l.nodeID)) - - pid, err := os.Create(filePath) - if err != nil { - return err - } - defer pid.Close() - - _, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid) - if err != nil { - return err - } - - l.pidFile = filePath - return nil -} - -// connectRPC uses the TLS certificate and admin macaroon files written by the -// lnd node to create a gRPC client connection. -func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { - // Wait until TLS certificate and admin macaroon are created before - // using them, up to 20 sec. - tlsTimeout := time.After(30 * time.Second) - for !fileExists(l.cfg.TLSCertPath) || !fileExists(l.cfg.AdminMacPath) { - select { - case <-tlsTimeout: - return nil, fmt.Errorf("timeout waiting for TLS cert file " + - "and admin macaroon file to be created after " + - "20 seconds") - case <-time.After(100 * time.Millisecond): - } - } - - tlsCreds, err := credentials.NewClientTLSFromFile(l.cfg.TLSCertPath, "") - if err != nil { - return nil, err - } - macBytes, err := ioutil.ReadFile(l.cfg.AdminMacPath) - if err != nil { - return nil, err - } - mac := &macaroon.Macaroon{} - if err = mac.UnmarshalBinary(macBytes); err != nil { - return nil, err - } - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(tlsCreds), - grpc.WithPerRPCCredentials(macaroons.NewMacaroonCredential(mac)), - grpc.WithBlock(), - grpc.WithTimeout(time.Second * 20), - } - return grpc.Dial(l.rpcAddr, opts...) -} - -// cleanup cleans up all the temporary files created by the node's process. -func (l *lightningNode) cleanup() error { - dirs := []string{ - l.cfg.LogDir, - l.cfg.DataDir, - } - - var err error - for _, dir := range dirs { - if removeErr := os.RemoveAll(dir); removeErr != nil { - log.Printf("Cannot remove dir %s: %v", dir, removeErr) - err = removeErr - } - } - return err -} - -// Stop attempts to stop the active lnd process. -func (l *lightningNode) Stop() error { - // Do nothing if the process never started successfully. - if l.LightningClient == nil { - return nil - } - - // Do nothing if the process already finished. - select { - case <-l.quit: - return nil - case <-l.processExit: - return nil - default: - } - - // Don't watch for error because sometimes the RPC connection gets - // closed before a response is returned. - req := lnrpc.StopRequest{} - ctx := context.Background() - l.LightningClient.StopDaemon(ctx, &req) - - close(l.quit) - l.wg.Wait() - return nil -} - -// Restart attempts to restart a lightning node by shutting it down cleanly, -// then restarting the process. This function is fully blocking. Upon restart, -// the RPC connection to the node will be re-attempted, continuing iff the -// connection attempt is successful. Additionally, if a callback is passed, the -// closure will be executed after the node has been shutdown, but before the -// process has been started up again. -func (l *lightningNode) Restart(errChan chan error, callback func() error) error { - if err := l.Stop(); err != nil { - return err - } - - <-l.processExit - - l.LightningClient = nil - l.processExit = make(chan struct{}) - l.quit = make(chan struct{}) - l.wg = sync.WaitGroup{} - - if callback != nil { - if err := callback(); err != nil { - return err - } - } - - return l.Start(errChan) -} - -// Shutdown stops the active lnd process and clean up any temporary directories -// created along the way. -func (l *lightningNode) Shutdown() error { - if err := l.Stop(); err != nil { - return err - } - if err := l.cleanup(); err != nil { - return err - } - return nil -} - -// closeChanWatchRequest is a request to the lightningNetworkWatcher to be -// notified once it's detected within the test Lightning Network, that a -// channel has either been added or closed. -type chanWatchRequest struct { - chanPoint wire.OutPoint - - chanOpen bool - - eventChan chan struct{} -} - -// lightningNetworkWatcher is a goroutine which is able to dispatch -// notifications once it has been observed that a target channel has been -// closed or opened within the network. In order to dispatch these -// notifications, the GraphTopologySubscription client exposed as part of the -// gRPC interface is used. -func (l *lightningNode) lightningNetworkWatcher() { - defer l.wg.Done() - - graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) - l.wg.Add(1) - go func() { - defer l.wg.Done() - - ctxb := context.Background() - req := &lnrpc.GraphTopologySubscription{} - topologyClient, err := l.SubscribeChannelGraph(ctxb, req) - if err != nil { - // We panic here in case of an error as failure to - // create the topology client will cause all subsequent - // tests to fail. - panic(fmt.Errorf("unable to create topology "+ - "client: %v", err)) - } - - for { - update, err := topologyClient.Recv() - if err == io.EOF { - return - } else if err != nil { - return - } - - select { - case graphUpdates <- update: - case <-l.quit: - return - } - } - }() - - // For each outpoint, we'll track an integer which denotes the number - // of edges seen for that channel within the network. When this number - // reaches 2, then it means that both edge advertisements has - // propagated through the network. - openChans := make(map[wire.OutPoint]int) - openClients := make(map[wire.OutPoint][]chan struct{}) - - closedChans := make(map[wire.OutPoint]struct{}) - closeClients := make(map[wire.OutPoint][]chan struct{}) - - for { - select { - - // A new graph update has just been received, so we'll examine - // the current set of registered clients to see if we can - // dispatch any requests. - case graphUpdate := <-graphUpdates: - // For each new channel, we'll increment the number of - // edges seen by one. - for _, newChan := range graphUpdate.ChannelUpdates { - txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: newChan.ChanPoint.OutputIndex, - } - openChans[op]++ - - // For this new channel, if the number of edges - // seen is less than two, then the channel - // hasn't been fully announced yet. - if numEdges := openChans[op]; numEdges < 2 { - continue - } - - // Otherwise, we'll notify all the registered - // clients and remove the dispatched clients. - for _, eventChan := range openClients[op] { - close(eventChan) - } - delete(openClients, op) - } - - // For each channel closed, we'll mark that we've - // detected a channel closure while lnd was pruning the - // channel graph. - for _, closedChan := range graphUpdate.ClosedChans { - txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: closedChan.ChanPoint.OutputIndex, - } - closedChans[op] = struct{}{} - - // As the channel has been closed, we'll notify - // all register clients. - for _, eventChan := range closeClients[op] { - close(eventChan) - } - delete(closeClients, op) - } - - // A new watch request, has just arrived. We'll either be able - // to dispatch immediately, or need to add the client for - // processing later. - case watchRequest := <-l.chanWatchRequests: - targetChan := watchRequest.chanPoint - - // TODO(roasbeef): add update type also, checks for - // multiple of 2 - if watchRequest.chanOpen { - // If this is a open request, then it can be - // dispatched if the number of edges seen for - // the channel is at least two. - if numEdges := openChans[targetChan]; numEdges >= 2 { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of - // watch open clients for this out point. - openClients[targetChan] = append(openClients[targetChan], - watchRequest.eventChan) - continue - } - - // If this is a close request, then it can be - // immediately dispatched if we've already seen a - // channel closure for this channel. - if _, ok := closedChans[targetChan]; ok { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of close watch - // clients for this out point. - closeClients[targetChan] = append(closeClients[targetChan], - watchRequest.eventChan) - - case <-l.quit: - return - } - } -} - -// WaitForNetworkChannelOpen will block until a channel with the target -// outpoint is seen as being fully advertised within the network. A channel is -// considered "fully advertised" once both of its directional edges has been -// advertised within the test Lightning Network. -func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - l.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - eventChan: eventChan, - chanOpen: true, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not opened before timeout") - } -} - -// WaitForNetworkChannelClose will block until a channel with the target -// outpoint is seen as closed within the network. A channel is considered -// closed once a transaction spending the funding outpoint is seen within a -// confirmed block. -func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - l.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - eventChan: eventChan, - chanOpen: false, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not closed before timeout") - } -} - -// WaitForBlockchainSync will block until the target nodes has fully -// synchronized with the blockchain. If the passed context object has a set -// timeout, then the goroutine will continually poll until the timeout has -// elapsed. In the case that the chain isn't synced before the timeout is up, -// then this function will return an error. -func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { - errChan := make(chan error, 1) - retryDelay := time.Millisecond * 100 - - go func() { - for { - select { - case <-ctx.Done(): - case <-l.quit: - return - default: - } - - getInfoReq := &lnrpc.GetInfoRequest{} - getInfoResp, err := l.GetInfo(ctx, getInfoReq) - if err != nil { - errChan <- err - return - } - if getInfoResp.SyncedToChain { - errChan <- nil - return - } - - select { - case <-ctx.Done(): - return - case <-time.After(retryDelay): - } - } - }() - - select { - case <-l.quit: - return nil - case err := <-errChan: - return err - case <-ctx.Done(): - return fmt.Errorf("Timeout while waiting for blockchain sync") - } -} - -// networkHarness is an integration testing harness for the lightning network. -// The harness by default is created with two active nodes on the network: -// Alice and Bob. -type networkHarness struct { - rpcConfig rpcclient.ConnConfig - netParams *chaincfg.Params - Miner *rpctest.Harness - - activeNodes map[int]*lightningNode - - // Alice and Bob are the initial seeder nodes that are automatically - // created to be the initial participants of the test network. - Alice *lightningNode - Bob *lightningNode - - seenTxns chan chainhash.Hash - bitcoinWatchRequests chan *txWatchRequest - - // Channel for transmitting stderr output from failed lightning node - // to main process. - lndErrorChan chan error - - quit chan struct{} - - sync.Mutex -} - -// newNetworkHarness creates a new network test harness. -// TODO(roasbeef): add option to use golang's build library to a binary of the -// current repo. This'll save developers from having to manually `go install` -// within the repo each time before changes -func newNetworkHarness() (*networkHarness, error) { - return &networkHarness{ - activeNodes: make(map[int]*lightningNode), - seenTxns: make(chan chainhash.Hash), - bitcoinWatchRequests: make(chan *txWatchRequest), - lndErrorChan: make(chan error), - quit: make(chan struct{}), - }, nil -} - -// InitializeSeedNodes initializes alice and bob nodes given an already -// running instance of btcd's rpctest harness and extra command line flags, -// which should be formatted properly - "--arg=value". -func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { - nodeConfig := r.RPCConfig() - - n.netParams = r.ActiveNet - n.Miner = r - n.rpcConfig = nodeConfig - - var err error - n.Alice, err = newLightningNode(&nodeConfig, lndArgs) - if err != nil { - return err - } - n.Bob, err = newLightningNode(&nodeConfig, lndArgs) - if err != nil { - return err - } - - n.activeNodes[n.Alice.nodeID] = n.Alice - n.activeNodes[n.Bob.nodeID] = n.Bob - - return err -} - -// ProcessErrors returns a channel used for reporting any fatal process errors. -// If any of the active nodes within the harness' test network incur a fatal -// error, that error is sent over this channel. -func (n *networkHarness) ProcessErrors() <-chan error { - return n.lndErrorChan -} - -// fakeLogger is a fake grpclog.Logger implementation. This is used to stop -// grpc's logger from printing directly to stdout. -type fakeLogger struct{} - -func (f *fakeLogger) Fatal(args ...interface{}) {} -func (f *fakeLogger) Fatalf(format string, args ...interface{}) {} -func (f *fakeLogger) Fatalln(args ...interface{}) {} -func (f *fakeLogger) Print(args ...interface{}) {} -func (f *fakeLogger) Printf(format string, args ...interface{}) {} -func (f *fakeLogger) Println(args ...interface{}) {} - -// SetUp starts the initial seeder nodes within the test harness. The initial -// node's wallets will be funded wallets with ten 1 BTC outputs each. Finally -// rpc clients capable of communicating with the initial seeder nodes are -// created. -func (n *networkHarness) SetUp() error { - // Swap out grpc's default logger with out fake logger which drops the - // statements on the floor. - grpclog.SetLogger(&fakeLogger{}) - - // Start the initial seeder nodes within the test network, then connect - // their respective RPC clients. - var wg sync.WaitGroup - errChan := make(chan error, 2) - wg.Add(2) - go func() { - defer wg.Done() - if err := n.Alice.Start(n.lndErrorChan); err != nil { - errChan <- err - } - }() - go func() { - defer wg.Done() - if err := n.Bob.Start(n.lndErrorChan); err != nil { - errChan <- err - } - }() - wg.Wait() - select { - case err := <-errChan: - return err - default: - } - - // Load up the wallets of the seeder nodes with 10 outputs of 1 BTC - // each. - ctxb := context.Background() - addrReq := &lnrpc.NewAddressRequest{ - Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH, - } - clients := []lnrpc.LightningClient{n.Alice, n.Bob} - for _, client := range clients { - for i := 0; i < 10; i++ { - resp, err := client.NewAddress(ctxb, addrReq) - if err != nil { - return err - } - addr, err := btcutil.DecodeAddress(resp.Address, n.netParams) - if err != nil { - return err - } - addrScript, err := txscript.PayToAddrScript(addr) - if err != nil { - return err - } - - output := &wire.TxOut{ - PkScript: addrScript, - Value: btcutil.SatoshiPerBitcoin, - } - if _, err := n.Miner.SendOutputs([]*wire.TxOut{output}, 30); err != nil { - return err - } - } - } - - // We generate several blocks in order to give the outputs created - // above a good number of confirmations. - if _, err := n.Miner.Node.Generate(10); err != nil { - return err - } - - // Finally, make a connection between both of the nodes. - if err := n.ConnectNodes(ctxb, n.Alice, n.Bob); err != nil { - return err - } - - // Now block until both wallets have fully synced up. - expectedBalance := int64(btcutil.SatoshiPerBitcoin * 10) - balReq := &lnrpc.WalletBalanceRequest{} - balanceTicker := time.Tick(time.Millisecond * 50) - balanceTimeout := time.After(time.Second * 30) -out: - for { - select { - case <-balanceTicker: - aliceResp, err := n.Alice.WalletBalance(ctxb, balReq) - if err != nil { - return err - } - bobResp, err := n.Bob.WalletBalance(ctxb, balReq) - if err != nil { - return err - } - - if aliceResp.ConfirmedBalance == expectedBalance && - bobResp.ConfirmedBalance == expectedBalance { - break out - } - case <-balanceTimeout: - return fmt.Errorf("balances not synced after deadline") - } - } - - // Now that the initial test network has been initialized, launch the - // network watcher. - go n.networkWatcher() - - return nil -} - -// TearDownAll tears down all active nodes within the test lightning network. -func (n *networkHarness) TearDownAll() error { - for _, node := range n.activeNodes { - if err := node.Shutdown(); err != nil { - return err - } - } - - close(n.lndErrorChan) - close(n.quit) - - return nil -} - -// NewNode fully initializes a returns a new lightningNode binded to the -// current instance of the network harness. The created node is running, but -// not yet connected to other nodes within the network. -func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { - n.Lock() - defer n.Unlock() - - node, err := newLightningNode(&n.rpcConfig, extraArgs) - if err != nil { - return nil, err - } - - // Put node in activeNodes to ensure Shutdown is called even if Start - // returns an error. - n.activeNodes[node.nodeID] = node - - if err := node.Start(n.lndErrorChan); err != nil { - return nil, err - } - - return node, nil -} - -// ConnectNodes establishes an encrypted+authenticated p2p connection from node -// a towards node b. The function will return a non-nil error if the connection -// was unable to be established. -// -// NOTE: This function may block for up to 15-seconds as it will not return -// until the new connection is detected as being known to both nodes. -func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) error { - bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) - if err != nil { - return err - } - - req := &lnrpc.ConnectPeerRequest{ - Addr: &lnrpc.LightningAddress{ - Pubkey: bobInfo.IdentityPubkey, - Host: b.p2pAddr, - }, - } - if _, err := a.ConnectPeer(ctx, req); err != nil { - return err - } - - timeout := time.After(time.Second * 15) - for { - - select { - case <-timeout: - return fmt.Errorf("peers not connected within 15 seconds") - default: - } - - // If node B is seen in the ListPeers response from node A, - // then we can exit early as the connection has been fully - // established. - resp, err := a.ListPeers(ctx, &lnrpc.ListPeersRequest{}) - if err != nil { - return err - } - for _, peer := range resp.Peers { - if peer.PubKey == b.PubKeyStr { - return nil - } - } - } -} - -// DisconnectNodes disconnects node a from node b by sending RPC message -// from a node to b node -func (n *networkHarness) DisconnectNodes(ctx context.Context, a, b *lightningNode) error { - bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) - if err != nil { - return err - } - - req := &lnrpc.DisconnectPeerRequest{ - PubKey: bobInfo.IdentityPubkey, - } - - if _, err := a.DisconnectPeer(ctx, req); err != nil { - return err - } - - return nil -} - -// RestartNode attempts to restart a lightning node by shutting it down -// cleanly, then restarting the process. This function is fully blocking. Upon -// restart, the RPC connection to the node will be re-attempted, continuing iff -// the connection attempt is successful. If the callback parameter is non-nil, -// then the function will be executed after the node shuts down, but *before* -// the process has been started up again. -// -// This method can be useful when testing edge cases such as a node broadcast -// and invalidated prior state, or persistent state recovery, simulating node -// crashes, etc. -func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error { - return node.Restart(n.lndErrorChan, callback) -} - -// TODO(roasbeef): add a WithChannel higher-order function? -// * python-like context manager w.r.t using a channel within a test -// * possibly adds more funds to the target wallet if the funds are not -// enough - -// txWatchRequest encapsulates a request to the harness' Bitcoin network -// watcher to dispatch a notification once a transaction with the target txid -// is seen within the test network. -type txWatchRequest struct { - txid chainhash.Hash - eventChan chan struct{} -} - -// bitcoinNetworkWatcher is a goroutine which accepts async notification -// requests for the broadcast of a target transaction, and then dispatches the -// transaction once its seen on the Bitcoin network. -func (n *networkHarness) networkWatcher() { - seenTxns := make(map[chainhash.Hash]struct{}) - clients := make(map[chainhash.Hash][]chan struct{}) - - for { - - select { - case <-n.quit: - return - - case req := <-n.bitcoinWatchRequests: - // If we've already seen this transaction, then - // immediately dispatch the request. Otherwise, append - // to the list of clients who are watching for the - // broadcast of this transaction. - if _, ok := seenTxns[req.txid]; ok { - close(req.eventChan) - } else { - clients[req.txid] = append(clients[req.txid], req.eventChan) - } - case txid := <-n.seenTxns: - // Add this txid to our set of "seen" transactions. So - // we're able to dispatch any notifications for this - // txid which arrive *after* it's seen within the - // network. - seenTxns[txid] = struct{}{} - - // If there isn't a registered notification for this - // transaction then ignore it. - txClients, ok := clients[txid] - if !ok { - continue - } - - // Otherwise, dispatch the notification to all clients, - // cleaning up the now un-needed state. - for _, client := range txClients { - close(client) - } - delete(clients, txid) - } - } -} - -// OnTxAccepted is a callback to be called each time a new transaction has been -// broadcast on the network. -func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) { - // Return immediately if harness has been torn down. - select { - case <-n.quit: - return - default: - } - - go func() { - n.seenTxns <- *hash - }() -} - -// WaitForTxBroadcast blocks until the target txid is seen on the network. If -// the transaction isn't seen within the network before the passed timeout, -// then an error is returned. -// TODO(roasbeef): add another method which creates queue of all seen transactions -func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error { - // Return immediately if harness has been torn down. - select { - case <-n.quit: - return fmt.Errorf("networkHarness has been torn down") - default: - } - - eventChan := make(chan struct{}) - - n.bitcoinWatchRequests <- &txWatchRequest{ - txid: txid, - eventChan: eventChan, - } - - select { - case <-eventChan: - return nil - case <-n.quit: - return fmt.Errorf("networkHarness has been torn down") - case <-ctx.Done(): - return fmt.Errorf("tx not seen before context timeout") - } -} - -// OpenChannel attempts to open a channel between srcNode and destNode with the -// passed channel funding parameters. If the passed context has a timeout, then -// if the timeout is reached before the channel pending notification is -// received, an error is returned. -func (n *networkHarness) OpenChannel(ctx context.Context, - srcNode, destNode *lightningNode, amt btcutil.Amount, - pushAmt btcutil.Amount) (lnrpc.Lightning_OpenChannelClient, error) { - - // Wait until srcNode and destNode have the latest chain synced. - // Otherwise, we may run into a check within the funding manager that - // prevents any funding workflows from being kicked off if the chain - // isn't yet synced. - if err := srcNode.WaitForBlockchainSync(ctx); err != nil { - return nil, fmt.Errorf("Unable to sync srcNode chain: %v", err) - } - if err := destNode.WaitForBlockchainSync(ctx); err != nil { - return nil, fmt.Errorf("Unable to sync destNode chain: %v", err) - } - - openReq := &lnrpc.OpenChannelRequest{ - NodePubkey: destNode.PubKey[:], - LocalFundingAmount: int64(amt), - PushSat: int64(pushAmt), - } - - respStream, err := srcNode.OpenChannel(ctx, openReq) - if err != nil { - return nil, fmt.Errorf("unable to open channel between "+ - "alice and bob: %v", err) - } - - chanOpen := make(chan struct{}) - errChan := make(chan error) - go func() { - // Consume the "channel pending" update. This waits until the node - // notifies us that the final message in the channel funding workflow - // has been sent to the remote node. - resp, err := respStream.Recv() - if err != nil { - errChan <- err - return - } - if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok { - errChan <- fmt.Errorf("expected channel pending update, "+ - "instead got %v", resp) - return - } - - close(chanOpen) - }() - - select { - case <-ctx.Done(): - return nil, fmt.Errorf("timeout reached before chan pending "+ - "update sent: %v", err) - case err := <-errChan: - return nil, err - case <-chanOpen: - return respStream, nil - } -} - -// OpenPendingChannel attempts to open a channel between srcNode and destNode with the -// passed channel funding parameters. If the passed context has a timeout, then -// if the timeout is reached before the channel pending notification is -// received, an error is returned. -func (n *networkHarness) OpenPendingChannel(ctx context.Context, - srcNode, destNode *lightningNode, amt btcutil.Amount, - pushAmt btcutil.Amount) (*lnrpc.PendingUpdate, error) { - - // Wait until srcNode and destNode have blockchain synced - if err := srcNode.WaitForBlockchainSync(ctx); err != nil { - return nil, fmt.Errorf("Unable to sync srcNode chain: %v", err) - } - if err := destNode.WaitForBlockchainSync(ctx); err != nil { - return nil, fmt.Errorf("Unable to sync destNode chain: %v", err) - } - - openReq := &lnrpc.OpenChannelRequest{ - NodePubkey: destNode.PubKey[:], - LocalFundingAmount: int64(amt), - PushSat: int64(pushAmt), - } - - respStream, err := srcNode.OpenChannel(ctx, openReq) - if err != nil { - return nil, fmt.Errorf("unable to open channel between "+ - "alice and bob: %v", err) - } - - chanPending := make(chan *lnrpc.PendingUpdate) - errChan := make(chan error) - go func() { - // Consume the "channel pending" update. This waits until the node - // notifies us that the final message in the channel funding workflow - // has been sent to the remote node. - resp, err := respStream.Recv() - if err != nil { - errChan <- err - return - } - pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending) - if !ok { - errChan <- fmt.Errorf("expected channel pending update, "+ - "instead got %v", resp) - return - } - - chanPending <- pendingResp.ChanPending - }() - - select { - case <-ctx.Done(): - return nil, fmt.Errorf("timeout reached before chan pending " + - "update sent") - case err := <-errChan: - return nil, err - case pendingChan := <-chanPending: - return pendingChan, nil - } -} - -// WaitForChannelOpen waits for a notification that a channel is open by -// consuming a message from the past open channel stream. If the passed context -// has a timeout, then if the timeout is reached before the channel has been -// opened, then an error is returned. -func (n *networkHarness) WaitForChannelOpen(ctx context.Context, - openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { - - errChan := make(chan error) - respChan := make(chan *lnrpc.ChannelPoint) - go func() { - resp, err := openChanStream.Recv() - if err != nil { - errChan <- fmt.Errorf("unable to read rpc resp: %v", err) - return - } - fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) - if !ok { - errChan <- fmt.Errorf("expected channel open update, "+ - "instead got %v", resp) - return - } - - respChan <- fundingResp.ChanOpen.ChannelPoint - }() - - select { - case <-ctx.Done(): - return nil, fmt.Errorf("timeout reached while waiting for " + - "channel open") - case err := <-errChan: - return nil, err - case chanPoint := <-respChan: - return chanPoint, nil - } -} - -// CloseChannel close channel attempts to close the channel indicated by the -// passed channel point, initiated by the passed lnNode. If the passed context -// has a timeout, then if the timeout is reached before the channel close is -// pending, then an error is returned. -func (n *networkHarness) CloseChannel(ctx context.Context, - lnNode *lightningNode, cp *lnrpc.ChannelPoint, - force bool) (lnrpc.Lightning_CloseChannelClient, *chainhash.Hash, error) { - - // Create a channel outpoint that we can use to compare to channels - // from the ListChannelsResponse. - fundingTxID, err := chainhash.NewHash(cp.FundingTxid) - if err != nil { - return nil, nil, err - } - chanPoint := wire.OutPoint{ - Hash: *fundingTxID, - Index: cp.OutputIndex, - } - - // If we are not force closing the channel, wait for channel to become - // active before attempting to close it. - numTries := 10 -CheckActive: - for i := 0; !force && i < numTries; i++ { - listReq := &lnrpc.ListChannelsRequest{} - listResp, err := lnNode.ListChannels(ctx, listReq) - if err != nil { - return nil, nil, fmt.Errorf("unable fetch node's "+ - "channels: %v", err) - } - - for _, c := range listResp.Channels { - if c.ChannelPoint == chanPoint.String() && c.Active { - break CheckActive - } - } - - if i == numTries-1 { - // Last iteration, and channel is still not active. - return nil, nil, fmt.Errorf("channel did not become " + - "active") - } - - // Sleep, and try again. - time.Sleep(300 * time.Millisecond) - } - - closeReq := &lnrpc.CloseChannelRequest{ - ChannelPoint: cp, - Force: force, - } - closeRespStream, err := lnNode.CloseChannel(ctx, closeReq) - if err != nil { - return nil, nil, fmt.Errorf("unable to close channel: %v", err) - } - - errChan := make(chan error) - fin := make(chan *chainhash.Hash) - go func() { - // Consume the "channel close" update in order to wait for the closing - // transaction to be broadcast, then wait for the closing tx to be seen - // within the network. - closeResp, err := closeRespStream.Recv() - if err != nil { - errChan <- err - return - } - pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending) - if !ok { - errChan <- fmt.Errorf("expected channel close update, "+ - "instead got %v", pendingClose) - return - } - - closeTxid, err := chainhash.NewHash(pendingClose.ClosePending.Txid) - if err != nil { - errChan <- err - return - } - if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil { - errChan <- err - return - } - fin <- closeTxid - }() - - // Wait until either the deadline for the context expires, an error - // occurs, or the channel close update is received. - select { - case <-ctx.Done(): - return nil, nil, fmt.Errorf("timeout reached before channel close " + - "initiated") - case err := <-errChan: - return nil, nil, err - case closeTxid := <-fin: - return closeRespStream, closeTxid, nil - } -} - -// WaitForChannelClose waits for a notification from the passed channel close -// stream that the node has deemed the channel has been fully closed. If the -// passed context has a timeout, then if the timeout is reached before the -// notification is received then an error is returned. -func (n *networkHarness) WaitForChannelClose(ctx context.Context, - closeChanStream lnrpc.Lightning_CloseChannelClient) (*chainhash.Hash, error) { - - errChan := make(chan error) - updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose) - go func() { - closeResp, err := closeChanStream.Recv() - if err != nil { - errChan <- err - return - } - - closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose) - if !ok { - errChan <- fmt.Errorf("expected channel close update, "+ - "instead got %v", closeFin) - return - } - - updateChan <- closeFin - }() - - // Wait until either the deadline for the context expires, an error - // occurs, or the channel close update is received. - select { - case <-ctx.Done(): - return nil, fmt.Errorf("timeout reached before update sent") - case err := <-errChan: - return nil, err - case update := <-updateChan: - return chainhash.NewHash(update.ChanClose.ClosingTxid) - } -} - -// AssertChannelExists asserts that an active channel identified by -// channelPoint is known to exist from the point-of-view of node.. -func (n *networkHarness) AssertChannelExists(ctx context.Context, - node *lightningNode, chanPoint *wire.OutPoint) error { - - req := &lnrpc.ListChannelsRequest{} - resp, err := node.ListChannels(ctx, req) - if err != nil { - return fmt.Errorf("unable fetch node's channels: %v", err) - } - - for _, channel := range resp.Channels { - if channel.ChannelPoint == chanPoint.String() { - return nil - } - } - - return fmt.Errorf("channel not found") -} - -// DumpLogs reads the current logs generated by the passed node, and returns -// the logs as a single string. This function is useful for examining the logs -// of a particular node in the case of a test failure. -// Logs from lightning node being generated with delay - you should -// add time.Sleep() in order to get all logs. -func (n *networkHarness) DumpLogs(node *lightningNode) (string, error) { - logFile := fmt.Sprintf("%v/simnet/lnd.log", node.cfg.LogDir) - - buf, err := ioutil.ReadFile(logFile) - if err != nil { - return "", err - } - - return string(buf), nil -} - -// SendCoins attempts to send amt satoshis from the internal mining node to the -// targeted lightning node. -func (n *networkHarness) SendCoins(ctx context.Context, amt btcutil.Amount, - target *lightningNode) error { - - balReq := &lnrpc.WalletBalanceRequest{} - initialBalance, err := target.WalletBalance(ctx, balReq) - if err != nil { - return err - } - - // First, obtain an address from the target lightning node, preferring - // to receive a p2wkh address s.t the output can immediately be used as - // an input to a funding transaction. - addrReq := &lnrpc.NewAddressRequest{ - Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH, - } - resp, err := target.NewAddress(ctx, addrReq) - if err != nil { - return err - } - addr, err := btcutil.DecodeAddress(resp.Address, n.netParams) - if err != nil { - return err - } - addrScript, err := txscript.PayToAddrScript(addr) - if err != nil { - return err - } - - // Generate a transaction which creates an output to the target - // pkScript of the desired amount. - output := &wire.TxOut{ - PkScript: addrScript, - Value: int64(amt), - } - if _, err := n.Miner.SendOutputs([]*wire.TxOut{output}, 30); err != nil { - return err - } - - // Finally, generate 6 new blocks to ensure the output gains a - // sufficient number of confirmations. - if _, err := n.Miner.Node.Generate(6); err != nil { - return err - } - - // Pause until the nodes current wallet balances reflects the amount - // sent to it above. - // TODO(roasbeef): factor out into helper func - balanceTicker := time.Tick(time.Millisecond * 50) - balanceTimeout := time.After(time.Second * 30) - for { - select { - case <-balanceTicker: - currentBal, err := target.WalletBalance(ctx, balReq) - if err != nil { - return err - } - - if currentBal.ConfirmedBalance == initialBalance.ConfirmedBalance+int64(amt) { - return nil - } - case <-balanceTimeout: - return fmt.Errorf("balances not synced after deadline") - } - } -} diff --git a/networktest_test.go b/networktest_test.go deleted file mode 100644 index 06ab7d0f9a3..00000000000 --- a/networktest_test.go +++ /dev/null @@ -1 +0,0 @@ -package main