From fdd9e1ff48c0d8def83d1c8dfd1ebfb2657e6a41 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Thu, 16 Nov 2017 15:37:08 -0800 Subject: [PATCH 1/7] lntest: MOVEONLY networktest.go -> lntest package. --- networktest.go => lntest/harness.go | 692 +--------------------------- lntest/node.go | 653 ++++++++++++++++++++++++++ networktest_test.go | 1 - 3 files changed, 654 insertions(+), 692 deletions(-) rename networktest.go => lntest/harness.go (55%) create mode 100644 lntest/node.go delete mode 100644 networktest_test.go diff --git a/networktest.go b/lntest/harness.go similarity index 55% rename from networktest.go rename to lntest/harness.go index be9be8d9c17..079b9cb6a96 100644 --- a/networktest.go +++ b/lntest/harness.go @@ -1,694 +1,4 @@ -package main - -import ( - "bytes" - "encoding/hex" - "flag" - "fmt" - "io" - "io/ioutil" - "log" - "net" - "os" - "path/filepath" - "strconv" - "sync" - "time" - - macaroon "gopkg.in/macaroon.v1" - - "golang.org/x/net/context" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/grpclog" - - "os/exec" - - "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/macaroons" - "github.com/roasbeef/btcd/chaincfg" - "github.com/roasbeef/btcd/chaincfg/chainhash" - "github.com/roasbeef/btcd/integration/rpctest" - "github.com/roasbeef/btcd/rpcclient" - "github.com/roasbeef/btcd/txscript" - "github.com/roasbeef/btcd/wire" - "github.com/roasbeef/btcutil" -) - -var ( - // numActiveNodes is the number of active nodes within the test network. - numActiveNodes = 0 - - // defaultNodePort is the initial p2p port which will be used by the - // first created lightning node to listen on for incoming p2p - // connections. Subsequent allocated ports for future lighting nodes - // instances will be monotonically increasing odd numbers calculated as - // such: defaultP2pPort + (2 * harness.nodeNum). - defaultNodePort = 19555 - - // defaultClientPort is the initial rpc port which will be used by the - // first created lightning node to listen on for incoming rpc - // connections. Subsequent allocated ports for future rpc harness - // instances will be monotonically increasing even numbers calculated - // as such: defaultP2pPort + (2 * harness.nodeNum). - defaultClientPort = 19556 - - harnessNetParams = &chaincfg.SimNetParams - - // logOutput is a flag that can be set to append the output from the - // seed nodes to log files. - logOutput = flag.Bool("logoutput", false, - "log output from node n to file outputn.log") - - // trickleDelay is the amount of time in milliseconds between each - // release of announcements by AuthenticatedGossiper to the network. - trickleDelay = 50 -) - -// generateListeningPorts returns two strings representing ports to listen on -// designated for the current lightning network test. If there haven't been any -// test instances created, the default ports are used. Otherwise, in order to -// support multiple test nodes running at once, the p2p and rpc port are -// incremented after each initialization. -func generateListeningPorts() (int, int) { - var p2p, rpc int - if numActiveNodes == 0 { - p2p = defaultNodePort - rpc = defaultClientPort - } else { - p2p = defaultNodePort + (2 * numActiveNodes) - rpc = defaultClientPort + (2 * numActiveNodes) - } - - return p2p, rpc -} - -// lightningNode represents an instance of lnd running within our test network -// harness. Each lightningNode instance also fully embedds an RPC client in -// order to pragmatically drive the node. -type lightningNode struct { - cfg *config - - rpcAddr string - p2pAddr string - rpcCert []byte - - nodeID int - - // PubKey is the serialized compressed identity public key of the node. - // This field will only be populated once the node itself has been - // started via the start() method. - PubKey [33]byte - PubKeyStr string - - cmd *exec.Cmd - pidFile string - - // processExit is a channel that's closed once it's detected that the - // process this instance of lightningNode is bound to has exited. - processExit chan struct{} - - extraArgs []string - - chanWatchRequests chan *chanWatchRequest - - quit chan struct{} - wg sync.WaitGroup - - lnrpc.LightningClient -} - -// newLightningNode creates a new test lightning node instance from the passed -// rpc config and slice of extra arguments. -func newLightningNode(btcrpcConfig *rpcclient.ConnConfig, lndArgs []string) (*lightningNode, error) { - var err error - - cfg := &config{ - Bitcoin: &chainConfig{ - RPCHost: btcrpcConfig.Host, - RPCUser: btcrpcConfig.User, - RPCPass: btcrpcConfig.Pass, - }, - } - - nodeNum := numActiveNodes - numActiveNodes++ - - cfg.DataDir, err = ioutil.TempDir("", "lndtest-data") - if err != nil { - return nil, err - } - cfg.LogDir, err = ioutil.TempDir("", "lndtest-log") - if err != nil { - return nil, err - } - cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") - cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") - cfg.AdminMacPath = filepath.Join(cfg.DataDir, "admin.macaroon") - cfg.ReadMacPath = filepath.Join(cfg.DataDir, "readonly.macaroon") - - cfg.PeerPort, cfg.RPCPort = generateListeningPorts() - - lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+ - strconv.Itoa(cfg.PeerPort)) - lndArgs = append(lndArgs, "--noencryptwallet") - - return &lightningNode{ - cfg: cfg, - p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), - rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), - rpcCert: btcrpcConfig.Certificates, - nodeID: nodeNum, - chanWatchRequests: make(chan *chanWatchRequest), - processExit: make(chan struct{}), - quit: make(chan struct{}), - extraArgs: lndArgs, - }, nil -} - -// genArgs generates a slice of command line arguments from the lightningNode's -// current config struct. -func (l *lightningNode) genArgs() []string { - var args []string - - encodedCert := hex.EncodeToString(l.rpcCert) - args = append(args, "--bitcoin.active") - args = append(args, "--bitcoin.simnet") - args = append(args, "--nobootstrap") - args = append(args, "--debuglevel=debug") - args = append(args, "--defaultchanconfs=1") - args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", l.cfg.Bitcoin.RPCHost)) - args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", l.cfg.Bitcoin.RPCUser)) - args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", l.cfg.Bitcoin.RPCPass)) - args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) - args = append(args, fmt.Sprintf("--rpcport=%v", l.cfg.RPCPort)) - args = append(args, fmt.Sprintf("--peerport=%v", l.cfg.PeerPort)) - args = append(args, fmt.Sprintf("--logdir=%v", l.cfg.LogDir)) - args = append(args, fmt.Sprintf("--datadir=%v", l.cfg.DataDir)) - args = append(args, fmt.Sprintf("--tlscertpath=%v", l.cfg.TLSCertPath)) - args = append(args, fmt.Sprintf("--tlskeypath=%v", l.cfg.TLSKeyPath)) - args = append(args, fmt.Sprintf("--configfile=%v", l.cfg.DataDir)) - args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", l.cfg.AdminMacPath)) - args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", l.cfg.ReadMacPath)) - args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) - - if l.extraArgs != nil { - args = append(args, l.extraArgs...) - } - - return args -} - -// Start launches a new process running lnd. Additionally, the PID of the -// launched process is saved in order to possibly kill the process forcibly -// later. -func (l *lightningNode) Start(lndError chan<- error) error { - args := l.genArgs() - - l.cmd = exec.Command("lnd", args...) - - // Redirect stderr output to buffer - var errb bytes.Buffer - l.cmd.Stderr = &errb - - // If the logoutput flag is passed, redirect output from the nodes to - // log files. - if *logOutput { - logFile := fmt.Sprintf("output%d.log", l.nodeID) - - // Create file if not exists, otherwise append. - file, err := os.OpenFile(logFile, - os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - if err != nil { - return err - } - - // Pass node's stderr to both errb and the file. - w := io.MultiWriter(&errb, file) - l.cmd.Stderr = w - - // Pass the node's stdout only to the file. - l.cmd.Stdout = file - } - - if err := l.cmd.Start(); err != nil { - return err - } - - // Launch a new goroutine which that bubbles up any potential fatal - // process errors to the goroutine running the tests. - go func() { - err := l.cmd.Wait() - if err != nil { - lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) - } - - // Signal any onlookers that this process has exited. - close(l.processExit) - }() - - // Write process ID to a file. - if err := l.writePidFile(); err != nil { - l.cmd.Process.Kill() - return err - } - - // Since Stop uses the LightningClient to stop the node, if we fail to get a - // connected client, we have to kill the process. - conn, err := l.connectRPC() - if err != nil { - l.cmd.Process.Kill() - return err - } - l.LightningClient = lnrpc.NewLightningClient(conn) - - // Obtain the lnid of this node for quick identification purposes. - ctxb := context.Background() - info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) - if err != nil { - return err - } - - l.PubKeyStr = info.IdentityPubkey - - pubkey, err := hex.DecodeString(info.IdentityPubkey) - if err != nil { - return err - } - copy(l.PubKey[:], pubkey) - - // Launch the watcher that'll hook into graph related topology change - // from the PoV of this node. - l.wg.Add(1) - go l.lightningNetworkWatcher() - - return nil -} - -// writePidFile writes the process ID of the running lnd process to a .pid file. -func (l *lightningNode) writePidFile() error { - filePath := filepath.Join(l.cfg.DataDir, fmt.Sprintf("%v.pid", l.nodeID)) - - pid, err := os.Create(filePath) - if err != nil { - return err - } - defer pid.Close() - - _, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid) - if err != nil { - return err - } - - l.pidFile = filePath - return nil -} - -// connectRPC uses the TLS certificate and admin macaroon files written by the -// lnd node to create a gRPC client connection. -func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { - // Wait until TLS certificate and admin macaroon are created before - // using them, up to 20 sec. - tlsTimeout := time.After(30 * time.Second) - for !fileExists(l.cfg.TLSCertPath) || !fileExists(l.cfg.AdminMacPath) { - select { - case <-tlsTimeout: - return nil, fmt.Errorf("timeout waiting for TLS cert file " + - "and admin macaroon file to be created after " + - "20 seconds") - case <-time.After(100 * time.Millisecond): - } - } - - tlsCreds, err := credentials.NewClientTLSFromFile(l.cfg.TLSCertPath, "") - if err != nil { - return nil, err - } - macBytes, err := ioutil.ReadFile(l.cfg.AdminMacPath) - if err != nil { - return nil, err - } - mac := &macaroon.Macaroon{} - if err = mac.UnmarshalBinary(macBytes); err != nil { - return nil, err - } - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(tlsCreds), - grpc.WithPerRPCCredentials(macaroons.NewMacaroonCredential(mac)), - grpc.WithBlock(), - grpc.WithTimeout(time.Second * 20), - } - return grpc.Dial(l.rpcAddr, opts...) -} - -// cleanup cleans up all the temporary files created by the node's process. -func (l *lightningNode) cleanup() error { - dirs := []string{ - l.cfg.LogDir, - l.cfg.DataDir, - } - - var err error - for _, dir := range dirs { - if removeErr := os.RemoveAll(dir); removeErr != nil { - log.Printf("Cannot remove dir %s: %v", dir, removeErr) - err = removeErr - } - } - return err -} - -// Stop attempts to stop the active lnd process. -func (l *lightningNode) Stop() error { - // Do nothing if the process never started successfully. - if l.LightningClient == nil { - return nil - } - - // Do nothing if the process already finished. - select { - case <-l.quit: - return nil - case <-l.processExit: - return nil - default: - } - - // Don't watch for error because sometimes the RPC connection gets - // closed before a response is returned. - req := lnrpc.StopRequest{} - ctx := context.Background() - l.LightningClient.StopDaemon(ctx, &req) - - close(l.quit) - l.wg.Wait() - return nil -} - -// Restart attempts to restart a lightning node by shutting it down cleanly, -// then restarting the process. This function is fully blocking. Upon restart, -// the RPC connection to the node will be re-attempted, continuing iff the -// connection attempt is successful. Additionally, if a callback is passed, the -// closure will be executed after the node has been shutdown, but before the -// process has been started up again. -func (l *lightningNode) Restart(errChan chan error, callback func() error) error { - if err := l.Stop(); err != nil { - return err - } - - <-l.processExit - - l.LightningClient = nil - l.processExit = make(chan struct{}) - l.quit = make(chan struct{}) - l.wg = sync.WaitGroup{} - - if callback != nil { - if err := callback(); err != nil { - return err - } - } - - return l.Start(errChan) -} - -// Shutdown stops the active lnd process and clean up any temporary directories -// created along the way. -func (l *lightningNode) Shutdown() error { - if err := l.Stop(); err != nil { - return err - } - if err := l.cleanup(); err != nil { - return err - } - return nil -} - -// closeChanWatchRequest is a request to the lightningNetworkWatcher to be -// notified once it's detected within the test Lightning Network, that a -// channel has either been added or closed. -type chanWatchRequest struct { - chanPoint wire.OutPoint - - chanOpen bool - - eventChan chan struct{} -} - -// lightningNetworkWatcher is a goroutine which is able to dispatch -// notifications once it has been observed that a target channel has been -// closed or opened within the network. In order to dispatch these -// notifications, the GraphTopologySubscription client exposed as part of the -// gRPC interface is used. -func (l *lightningNode) lightningNetworkWatcher() { - defer l.wg.Done() - - graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) - l.wg.Add(1) - go func() { - defer l.wg.Done() - - ctxb := context.Background() - req := &lnrpc.GraphTopologySubscription{} - topologyClient, err := l.SubscribeChannelGraph(ctxb, req) - if err != nil { - // We panic here in case of an error as failure to - // create the topology client will cause all subsequent - // tests to fail. - panic(fmt.Errorf("unable to create topology "+ - "client: %v", err)) - } - - for { - update, err := topologyClient.Recv() - if err == io.EOF { - return - } else if err != nil { - return - } - - select { - case graphUpdates <- update: - case <-l.quit: - return - } - } - }() - - // For each outpoint, we'll track an integer which denotes the number - // of edges seen for that channel within the network. When this number - // reaches 2, then it means that both edge advertisements has - // propagated through the network. - openChans := make(map[wire.OutPoint]int) - openClients := make(map[wire.OutPoint][]chan struct{}) - - closedChans := make(map[wire.OutPoint]struct{}) - closeClients := make(map[wire.OutPoint][]chan struct{}) - - for { - select { - - // A new graph update has just been received, so we'll examine - // the current set of registered clients to see if we can - // dispatch any requests. - case graphUpdate := <-graphUpdates: - // For each new channel, we'll increment the number of - // edges seen by one. - for _, newChan := range graphUpdate.ChannelUpdates { - txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: newChan.ChanPoint.OutputIndex, - } - openChans[op]++ - - // For this new channel, if the number of edges - // seen is less than two, then the channel - // hasn't been fully announced yet. - if numEdges := openChans[op]; numEdges < 2 { - continue - } - - // Otherwise, we'll notify all the registered - // clients and remove the dispatched clients. - for _, eventChan := range openClients[op] { - close(eventChan) - } - delete(openClients, op) - } - - // For each channel closed, we'll mark that we've - // detected a channel closure while lnd was pruning the - // channel graph. - for _, closedChan := range graphUpdate.ClosedChans { - txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: closedChan.ChanPoint.OutputIndex, - } - closedChans[op] = struct{}{} - - // As the channel has been closed, we'll notify - // all register clients. - for _, eventChan := range closeClients[op] { - close(eventChan) - } - delete(closeClients, op) - } - - // A new watch request, has just arrived. We'll either be able - // to dispatch immediately, or need to add the client for - // processing later. - case watchRequest := <-l.chanWatchRequests: - targetChan := watchRequest.chanPoint - - // TODO(roasbeef): add update type also, checks for - // multiple of 2 - if watchRequest.chanOpen { - // If this is a open request, then it can be - // dispatched if the number of edges seen for - // the channel is at least two. - if numEdges := openChans[targetChan]; numEdges >= 2 { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of - // watch open clients for this out point. - openClients[targetChan] = append(openClients[targetChan], - watchRequest.eventChan) - continue - } - - // If this is a close request, then it can be - // immediately dispatched if we've already seen a - // channel closure for this channel. - if _, ok := closedChans[targetChan]; ok { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of close watch - // clients for this out point. - closeClients[targetChan] = append(closeClients[targetChan], - watchRequest.eventChan) - - case <-l.quit: - return - } - } -} - -// WaitForNetworkChannelOpen will block until a channel with the target -// outpoint is seen as being fully advertised within the network. A channel is -// considered "fully advertised" once both of its directional edges has been -// advertised within the test Lightning Network. -func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - l.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - eventChan: eventChan, - chanOpen: true, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not opened before timeout") - } -} - -// WaitForNetworkChannelClose will block until a channel with the target -// outpoint is seen as closed within the network. A channel is considered -// closed once a transaction spending the funding outpoint is seen within a -// confirmed block. -func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - l.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - eventChan: eventChan, - chanOpen: false, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not closed before timeout") - } -} - -// WaitForBlockchainSync will block until the target nodes has fully -// synchronized with the blockchain. If the passed context object has a set -// timeout, then the goroutine will continually poll until the timeout has -// elapsed. In the case that the chain isn't synced before the timeout is up, -// then this function will return an error. -func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { - errChan := make(chan error, 1) - retryDelay := time.Millisecond * 100 - - go func() { - for { - select { - case <-ctx.Done(): - case <-l.quit: - return - default: - } - - getInfoReq := &lnrpc.GetInfoRequest{} - getInfoResp, err := l.GetInfo(ctx, getInfoReq) - if err != nil { - errChan <- err - return - } - if getInfoResp.SyncedToChain { - errChan <- nil - return - } - - select { - case <-ctx.Done(): - return - case <-time.After(retryDelay): - } - } - }() - - select { - case <-l.quit: - return nil - case err := <-errChan: - return err - case <-ctx.Done(): - return fmt.Errorf("Timeout while waiting for blockchain sync") - } -} +package lntest // networkHarness is an integration testing harness for the lightning network. // The harness by default is created with two active nodes on the network: diff --git a/lntest/node.go b/lntest/node.go new file mode 100644 index 00000000000..c753f66713a --- /dev/null +++ b/lntest/node.go @@ -0,0 +1,653 @@ +package lntest + +var ( + // numActiveNodes is the number of active nodes within the test network. + numActiveNodes = 0 + + // defaultNodePort is the initial p2p port which will be used by the + // first created lightning node to listen on for incoming p2p + // connections. Subsequent allocated ports for future lighting nodes + // instances will be monotonically increasing odd numbers calculated as + // such: defaultP2pPort + (2 * harness.nodeNum). + defaultNodePort = 19555 + + // defaultClientPort is the initial rpc port which will be used by the + // first created lightning node to listen on for incoming rpc + // connections. Subsequent allocated ports for future rpc harness + // instances will be monotonically increasing even numbers calculated + // as such: defaultP2pPort + (2 * harness.nodeNum). + defaultClientPort = 19556 + + harnessNetParams = &chaincfg.SimNetParams + + // logOutput is a flag that can be set to append the output from the + // seed nodes to log files. + logOutput = flag.Bool("logoutput", false, + "log output from node n to file outputn.log") + + // trickleDelay is the amount of time in milliseconds between each + // release of announcements by AuthenticatedGossiper to the network. + trickleDelay = 50 +) + +// generateListeningPorts returns two strings representing ports to listen on +// designated for the current lightning network test. If there haven't been any +// test instances created, the default ports are used. Otherwise, in order to +// support multiple test nodes running at once, the p2p and rpc port are +// incremented after each initialization. +func generateListeningPorts() (int, int) { + var p2p, rpc int + if numActiveNodes == 0 { + p2p = defaultNodePort + rpc = defaultClientPort + } else { + p2p = defaultNodePort + (2 * numActiveNodes) + rpc = defaultClientPort + (2 * numActiveNodes) + } + + return p2p, rpc +} + +// lightningNode represents an instance of lnd running within our test network +// harness. Each lightningNode instance also fully embedds an RPC client in +// order to pragmatically drive the node. +type lightningNode struct { + cfg *config + + rpcAddr string + p2pAddr string + rpcCert []byte + + nodeID int + + // PubKey is the serialized compressed identity public key of the node. + // This field will only be populated once the node itself has been + // started via the start() method. + PubKey [33]byte + PubKeyStr string + + cmd *exec.Cmd + pidFile string + + // processExit is a channel that's closed once it's detected that the + // process this instance of lightningNode is bound to has exited. + processExit chan struct{} + + extraArgs []string + + chanWatchRequests chan *chanWatchRequest + + quit chan struct{} + wg sync.WaitGroup + + lnrpc.LightningClient +} + +// newLightningNode creates a new test lightning node instance from the passed +// rpc config and slice of extra arguments. +func newLightningNode(btcrpcConfig *rpcclient.ConnConfig, lndArgs []string) (*lightningNode, error) { + var err error + + cfg := &config{ + Bitcoin: &chainConfig{ + RPCHost: btcrpcConfig.Host, + RPCUser: btcrpcConfig.User, + RPCPass: btcrpcConfig.Pass, + }, + } + + nodeNum := numActiveNodes + numActiveNodes++ + + cfg.DataDir, err = ioutil.TempDir("", "lndtest-data") + if err != nil { + return nil, err + } + cfg.LogDir, err = ioutil.TempDir("", "lndtest-log") + if err != nil { + return nil, err + } + cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") + cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") + cfg.AdminMacPath = filepath.Join(cfg.DataDir, "admin.macaroon") + cfg.ReadMacPath = filepath.Join(cfg.DataDir, "readonly.macaroon") + + cfg.PeerPort, cfg.RPCPort = generateListeningPorts() + + lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+ + strconv.Itoa(cfg.PeerPort)) + lndArgs = append(lndArgs, "--noencryptwallet") + + return &lightningNode{ + cfg: cfg, + p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), + rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), + rpcCert: btcrpcConfig.Certificates, + nodeID: nodeNum, + chanWatchRequests: make(chan *chanWatchRequest), + processExit: make(chan struct{}), + quit: make(chan struct{}), + extraArgs: lndArgs, + }, nil +} + +// genArgs generates a slice of command line arguments from the lightningNode's +// current config struct. +func (l *lightningNode) genArgs() []string { + var args []string + + encodedCert := hex.EncodeToString(l.rpcCert) + args = append(args, "--bitcoin.active") + args = append(args, "--bitcoin.simnet") + args = append(args, "--nobootstrap") + args = append(args, "--debuglevel=debug") + args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", l.cfg.Bitcoin.RPCHost)) + args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", l.cfg.Bitcoin.RPCUser)) + args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", l.cfg.Bitcoin.RPCPass)) + args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) + args = append(args, fmt.Sprintf("--rpcport=%v", l.cfg.RPCPort)) + args = append(args, fmt.Sprintf("--peerport=%v", l.cfg.PeerPort)) + args = append(args, fmt.Sprintf("--logdir=%v", l.cfg.LogDir)) + args = append(args, fmt.Sprintf("--datadir=%v", l.cfg.DataDir)) + args = append(args, fmt.Sprintf("--tlscertpath=%v", l.cfg.TLSCertPath)) + args = append(args, fmt.Sprintf("--tlskeypath=%v", l.cfg.TLSKeyPath)) + args = append(args, fmt.Sprintf("--configfile=%v", l.cfg.DataDir)) + args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", l.cfg.AdminMacPath)) + args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", l.cfg.ReadMacPath)) + args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) + + if l.extraArgs != nil { + args = append(args, l.extraArgs...) + } + + return args +} + +// Start launches a new process running lnd. Additionally, the PID of the +// launched process is saved in order to possibly kill the process forcibly +// later. +func (l *lightningNode) Start(lndError chan<- error) error { + args := l.genArgs() + + l.cmd = exec.Command("lnd", args...) + + // Redirect stderr output to buffer + var errb bytes.Buffer + l.cmd.Stderr = &errb + + // If the logoutput flag is passed, redirect output from the nodes to + // log files. + if *logOutput { + logFile := fmt.Sprintf("output%d.log", l.nodeID) + + // Create file if not exists, otherwise append. + file, err := os.OpenFile(logFile, + os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + return err + } + + // Pass node's stderr to both errb and the file. + w := io.MultiWriter(&errb, file) + l.cmd.Stderr = w + + // Pass the node's stdout only to the file. + l.cmd.Stdout = file + } + + if err := l.cmd.Start(); err != nil { + return err + } + + // Launch a new goroutine which that bubbles up any potential fatal + // process errors to the goroutine running the tests. + go func() { + err := l.cmd.Wait() + if err != nil { + lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) + } + + // Signal any onlookers that this process has exited. + close(l.processExit) + }() + + // Write process ID to a file. + if err := l.writePidFile(); err != nil { + l.cmd.Process.Kill() + return err + } + + // Since Stop uses the LightningClient to stop the node, if we fail to get a + // connected client, we have to kill the process. + conn, err := l.connectRPC() + if err != nil { + l.cmd.Process.Kill() + return err + } + l.LightningClient = lnrpc.NewLightningClient(conn) + + // Obtain the lnid of this node for quick identification purposes. + ctxb := context.Background() + info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) + if err != nil { + return err + } + + l.PubKeyStr = info.IdentityPubkey + + pubkey, err := hex.DecodeString(info.IdentityPubkey) + if err != nil { + return err + } + copy(l.PubKey[:], pubkey) + + // Launch the watcher that'll hook into graph related topology change + // from the PoV of this node. + l.wg.Add(1) + go l.lightningNetworkWatcher() + + return nil +} + +// writePidFile writes the process ID of the running lnd process to a .pid file. +func (l *lightningNode) writePidFile() error { + filePath := filepath.Join(l.cfg.DataDir, fmt.Sprintf("%v.pid", l.nodeID)) + + pid, err := os.Create(filePath) + if err != nil { + return err + } + defer pid.Close() + + _, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid) + if err != nil { + return err + } + + l.pidFile = filePath + return nil +} + +// connectRPC uses the TLS certificate and admin macaroon files written by the +// lnd node to create a gRPC client connection. +func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { + // Wait until TLS certificate and admin macaroon are created before + // using them, up to 20 sec. + tlsTimeout := time.After(30 * time.Second) + for !fileExists(l.cfg.TLSCertPath) || !fileExists(l.cfg.AdminMacPath) { + select { + case <-tlsTimeout: + return nil, fmt.Errorf("timeout waiting for TLS cert file " + + "and admin macaroon file to be created after " + + "20 seconds") + case <-time.After(100 * time.Millisecond): + } + } + + tlsCreds, err := credentials.NewClientTLSFromFile(l.cfg.TLSCertPath, "") + if err != nil { + return nil, err + } + macBytes, err := ioutil.ReadFile(l.cfg.AdminMacPath) + if err != nil { + return nil, err + } + mac := &macaroon.Macaroon{} + if err = mac.UnmarshalBinary(macBytes); err != nil { + return nil, err + } + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(tlsCreds), + grpc.WithPerRPCCredentials(macaroons.NewMacaroonCredential(mac)), + grpc.WithBlock(), + grpc.WithTimeout(time.Second * 20), + } + return grpc.Dial(l.rpcAddr, opts...) +} + +// cleanup cleans up all the temporary files created by the node's process. +func (l *lightningNode) cleanup() error { + dirs := []string{ + l.cfg.LogDir, + l.cfg.DataDir, + } + + var err error + for _, dir := range dirs { + if removeErr := os.RemoveAll(dir); removeErr != nil { + log.Printf("Cannot remove dir %s: %v", dir, removeErr) + err = removeErr + } + } + return err +} + +// Stop attempts to stop the active lnd process. +func (l *lightningNode) Stop() error { + // Do nothing if the process never started successfully. + if l.LightningClient == nil { + return nil + } + + // Do nothing if the process already finished. + select { + case <-l.quit: + return nil + case <-l.processExit: + return nil + default: + } + + // Don't watch for error because sometimes the RPC connection gets + // closed before a response is returned. + req := lnrpc.StopRequest{} + ctx := context.Background() + l.LightningClient.StopDaemon(ctx, &req) + + close(l.quit) + l.wg.Wait() + return nil +} + +// Restart attempts to restart a lightning node by shutting it down cleanly, +// then restarting the process. This function is fully blocking. Upon restart, +// the RPC connection to the node will be re-attempted, continuing iff the +// connection attempt is successful. Additionally, if a callback is passed, the +// closure will be executed after the node has been shutdown, but before the +// process has been started up again. +func (l *lightningNode) Restart(errChan chan error, callback func() error) error { + if err := l.Stop(); err != nil { + return err + } + + <-l.processExit + + l.LightningClient = nil + l.processExit = make(chan struct{}) + l.quit = make(chan struct{}) + l.wg = sync.WaitGroup{} + + if callback != nil { + if err := callback(); err != nil { + return err + } + } + + return l.Start(errChan) +} + +// Shutdown stops the active lnd process and clean up any temporary directories +// created along the way. +func (l *lightningNode) Shutdown() error { + if err := l.Stop(); err != nil { + return err + } + if err := l.cleanup(); err != nil { + return err + } + return nil +} + +// closeChanWatchRequest is a request to the lightningNetworkWatcher to be +// notified once it's detected within the test Lightning Network, that a +// channel has either been added or closed. +type chanWatchRequest struct { + chanPoint wire.OutPoint + + chanOpen bool + + eventChan chan struct{} +} + +// lightningNetworkWatcher is a goroutine which is able to dispatch +// notifications once it has been observed that a target channel has been +// closed or opened within the network. In order to dispatch these +// notifications, the GraphTopologySubscription client exposed as part of the +// gRPC interface is used. +func (l *lightningNode) lightningNetworkWatcher() { + defer l.wg.Done() + + graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) + l.wg.Add(1) + go func() { + defer l.wg.Done() + + ctxb := context.Background() + req := &lnrpc.GraphTopologySubscription{} + topologyClient, err := l.SubscribeChannelGraph(ctxb, req) + if err != nil { + // We panic here in case of an error as failure to + // create the topology client will cause all subsequent + // tests to fail. + panic(fmt.Errorf("unable to create topology "+ + "client: %v", err)) + } + + for { + update, err := topologyClient.Recv() + if err == io.EOF { + return + } else if err != nil { + return + } + + select { + case graphUpdates <- update: + case <-l.quit: + return + } + } + }() + + // For each outpoint, we'll track an integer which denotes the number + // of edges seen for that channel within the network. When this number + // reaches 2, then it means that both edge advertisements has + // propagated through the network. + openChans := make(map[wire.OutPoint]int) + openClients := make(map[wire.OutPoint][]chan struct{}) + + closedChans := make(map[wire.OutPoint]struct{}) + closeClients := make(map[wire.OutPoint][]chan struct{}) + + for { + select { + + // A new graph update has just been received, so we'll examine + // the current set of registered clients to see if we can + // dispatch any requests. + case graphUpdate := <-graphUpdates: + // For each new channel, we'll increment the number of + // edges seen by one. + for _, newChan := range graphUpdate.ChannelUpdates { + txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: newChan.ChanPoint.OutputIndex, + } + openChans[op]++ + + // For this new channel, if the number of edges + // seen is less than two, then the channel + // hasn't been fully announced yet. + if numEdges := openChans[op]; numEdges < 2 { + continue + } + + // Otherwise, we'll notify all the registered + // clients and remove the dispatched clients. + for _, eventChan := range openClients[op] { + close(eventChan) + } + delete(openClients, op) + } + + // For each channel closed, we'll mark that we've + // detected a channel closure while lnd was pruning the + // channel graph. + for _, closedChan := range graphUpdate.ClosedChans { + txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: closedChan.ChanPoint.OutputIndex, + } + closedChans[op] = struct{}{} + + // As the channel has been closed, we'll notify + // all register clients. + for _, eventChan := range closeClients[op] { + close(eventChan) + } + delete(closeClients, op) + } + + // A new watch request, has just arrived. We'll either be able + // to dispatch immediately, or need to add the client for + // processing later. + case watchRequest := <-l.chanWatchRequests: + targetChan := watchRequest.chanPoint + + // TODO(roasbeef): add update type also, checks for + // multiple of 2 + if watchRequest.chanOpen { + // If this is a open request, then it can be + // dispatched if the number of edges seen for + // the channel is at least two. + if numEdges := openChans[targetChan]; numEdges >= 2 { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of + // watch open clients for this out point. + openClients[targetChan] = append(openClients[targetChan], + watchRequest.eventChan) + continue + } + + // If this is a close request, then it can be + // immediately dispatched if we've already seen a + // channel closure for this channel. + if _, ok := closedChans[targetChan]; ok { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of close watch + // clients for this out point. + closeClients[targetChan] = append(closeClients[targetChan], + watchRequest.eventChan) + + case <-l.quit: + return + } + } +} + +// WaitForNetworkChannelOpen will block until a channel with the target +// outpoint is seen as being fully advertised within the network. A channel is +// considered "fully advertised" once both of its directional edges has been +// advertised within the test Lightning Network. +func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + l.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: true, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not opened before timeout") + } +} + +// WaitForNetworkChannelClose will block until a channel with the target +// outpoint is seen as closed within the network. A channel is considered +// closed once a transaction spending the funding outpoint is seen within a +// confirmed block. +func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + l.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: false, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not closed before timeout") + } +} + +// WaitForBlockchainSync will block until the target nodes has fully +// synchronized with the blockchain. If the passed context object has a set +// timeout, then the goroutine will continually poll until the timeout has +// elapsed. In the case that the chain isn't synced before the timeout is up, +// then this function will return an error. +func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { + errChan := make(chan error, 1) + retryDelay := time.Millisecond * 100 + + go func() { + for { + select { + case <-ctx.Done(): + case <-l.quit: + return + default: + } + + getInfoReq := &lnrpc.GetInfoRequest{} + getInfoResp, err := l.GetInfo(ctx, getInfoReq) + if err != nil { + errChan <- err + return + } + if getInfoResp.SyncedToChain { + errChan <- nil + return + } + + select { + case <-ctx.Done(): + return + case <-time.After(retryDelay): + } + } + }() + + select { + case <-l.quit: + return nil + case err := <-errChan: + return err + case <-ctx.Done(): + return fmt.Errorf("Timeout while waiting for blockchain sync") + } +} diff --git a/networktest_test.go b/networktest_test.go deleted file mode 100644 index 06ab7d0f9a3..00000000000 --- a/networktest_test.go +++ /dev/null @@ -1 +0,0 @@ -package main From c5c884240e24a4b06a9375768ed5e6c8fdbc24f6 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 3 Nov 2017 14:06:07 -0700 Subject: [PATCH 2/7] lntest: Break lntest dependence on config in the main package. This creates a new nodeConfig struct for the node in the lntest package in order to decouple lntest from the main package. --- lntest/harness.go | 20 ++++-- lntest/node.go | 173 +++++++++++++++++++++++----------------------- 2 files changed, 102 insertions(+), 91 deletions(-) diff --git a/lntest/harness.go b/lntest/harness.go index 079b9cb6a96..6222e04c80f 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -45,18 +45,22 @@ func newNetworkHarness() (*networkHarness, error) { // running instance of btcd's rpctest harness and extra command line flags, // which should be formatted properly - "--arg=value". func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { - nodeConfig := r.RPCConfig() - n.netParams = r.ActiveNet n.Miner = r n.rpcConfig = nodeConfig + config := nodeConfig{ + RPCConfig: &n.rpcConfig, + NetParams: n.netParams, + ExtraArgs: lndArgs, + } + var err error - n.Alice, err = newLightningNode(&nodeConfig, lndArgs) + n.Alice, err = newLightningNode(config) if err != nil { return err } - n.Bob, err = newLightningNode(&nodeConfig, lndArgs) + n.Bob, err = newLightningNode(config) if err != nil { return err } @@ -216,7 +220,11 @@ func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { n.Lock() defer n.Unlock() - node, err := newLightningNode(&n.rpcConfig, extraArgs) + node, err := newLightningNode(nodeConfig{ + RPCConfig: &n.rpcConfig, + NetParams: n.netParams, + ExtraArgs: extraArgs, + }) if err != nil { return nil, err } @@ -247,7 +255,7 @@ func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) req := &lnrpc.ConnectPeerRequest{ Addr: &lnrpc.LightningAddress{ Pubkey: bobInfo.IdentityPubkey, - Host: b.p2pAddr, + Host: b.cfg.P2PAddr(), }, } if _, err := a.ConnectPeer(ctx, req); err != nil { diff --git a/lntest/node.go b/lntest/node.go index c753f66713a..738801761e5 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -48,15 +48,82 @@ func generateListeningPorts() (int, int) { return p2p, rpc } +type nodeConfig struct { + RPCConfig *rpcclient.ConnConfig + NetParams *chaincfg.Params + BaseDir string + ExtraArgs []string + + DataDir string + LogDir string + TLSCertPath string + TLSKeyPath string + AdminMacPath string + ReadMacPath string + P2PPort int + RPCPort int +} + +func (cfg nodeConfig) P2PAddr() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.P2PPort)) +} + +func (cfg nodeConfig) RPCAddr() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)) +} + +func (cfg nodeConfig) DBPath() string { + return filepath.Join(cfg.DataDir, cfg.NetParams.Name, "bitcoin/channel.db") +} + +// genArgs generates a slice of command line arguments from the lightning node +// config struct. +func (cfg nodeConfig) genArgs() []string { + var args []string + + switch cfg.NetParams { + case &chaincfg.TestNet3Params: + args = append(args, "--bitcoin.testnet") + case &chaincfg.SimNetParams: + args = append(args, "--bitcoin.simnet") + case &chaincfg.RegressionNetParams: + args = append(args, "--bitcoin.regtest") + } + + encodedCert := hex.EncodeToString(cfg.RPCConfig.Certificates) + args = append(args, "--bitcoin.active") + args = append(args, "--nobootstrap") + args = append(args, "--noencryptwallet") + args = append(args, "--debuglevel=debug") + args = append(args, "--defaultchanconfs=1") + args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", cfg.RPCConfig.Host)) + args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", cfg.RPCConfig.User)) + args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", cfg.RPCConfig.Pass)) + args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) + args = append(args, fmt.Sprintf("--rpcport=%v", cfg.RPCPort)) + args = append(args, fmt.Sprintf("--peerport=%v", cfg.P2PPort)) + args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir)) + args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir)) + args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath)) + args = append(args, fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath)) + args = append(args, fmt.Sprintf("--configfile=%v", cfg.DataDir)) + args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath)) + args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath)) + args = append(args, fmt.Sprintf("--externalip=%s", cfg.P2PAddr())) + args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) + + if cfg.ExtraArgs != nil { + args = append(args, cfg.ExtraArgs...) + } + + return args +} + // lightningNode represents an instance of lnd running within our test network // harness. Each lightningNode instance also fully embedds an RPC client in // order to pragmatically drive the node. type lightningNode struct { - cfg *config - - rpcAddr string - p2pAddr string - rpcCert []byte + cfg *nodeConfig nodeID int @@ -73,8 +140,6 @@ type lightningNode struct { // process this instance of lightningNode is bound to has exited. processExit chan struct{} - extraArgs []string - chanWatchRequests chan *chanWatchRequest quit chan struct{} @@ -85,90 +150,40 @@ type lightningNode struct { // newLightningNode creates a new test lightning node instance from the passed // rpc config and slice of extra arguments. -func newLightningNode(btcrpcConfig *rpcclient.ConnConfig, lndArgs []string) (*lightningNode, error) { - var err error - - cfg := &config{ - Bitcoin: &chainConfig{ - RPCHost: btcrpcConfig.Host, - RPCUser: btcrpcConfig.User, - RPCPass: btcrpcConfig.Pass, - }, - } - - nodeNum := numActiveNodes - numActiveNodes++ - - cfg.DataDir, err = ioutil.TempDir("", "lndtest-data") - if err != nil { - return nil, err - } - cfg.LogDir, err = ioutil.TempDir("", "lndtest-log") - if err != nil { - return nil, err +func newLightningNode(cfg nodeConfig) (*lightningNode, error) { + if cfg.BaseDir == "" { + var err error + cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node") + if err != nil { + return nil, err + } } + cfg.DataDir = filepath.Join(cfg.BaseDir, "data") + cfg.LogDir = filepath.Join(cfg.BaseDir, "log") cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") cfg.AdminMacPath = filepath.Join(cfg.DataDir, "admin.macaroon") cfg.ReadMacPath = filepath.Join(cfg.DataDir, "readonly.macaroon") - cfg.PeerPort, cfg.RPCPort = generateListeningPorts() + cfg.P2PPort, cfg.RPCPort = generateListeningPorts() - lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+ - strconv.Itoa(cfg.PeerPort)) - lndArgs = append(lndArgs, "--noencryptwallet") + nodeNum := numActiveNodes + numActiveNodes++ return &lightningNode{ - cfg: cfg, - p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), - rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), - rpcCert: btcrpcConfig.Certificates, + cfg: &cfg, nodeID: nodeNum, chanWatchRequests: make(chan *chanWatchRequest), processExit: make(chan struct{}), quit: make(chan struct{}), - extraArgs: lndArgs, }, nil } -// genArgs generates a slice of command line arguments from the lightningNode's -// current config struct. -func (l *lightningNode) genArgs() []string { - var args []string - - encodedCert := hex.EncodeToString(l.rpcCert) - args = append(args, "--bitcoin.active") - args = append(args, "--bitcoin.simnet") - args = append(args, "--nobootstrap") - args = append(args, "--debuglevel=debug") - args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", l.cfg.Bitcoin.RPCHost)) - args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", l.cfg.Bitcoin.RPCUser)) - args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", l.cfg.Bitcoin.RPCPass)) - args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) - args = append(args, fmt.Sprintf("--rpcport=%v", l.cfg.RPCPort)) - args = append(args, fmt.Sprintf("--peerport=%v", l.cfg.PeerPort)) - args = append(args, fmt.Sprintf("--logdir=%v", l.cfg.LogDir)) - args = append(args, fmt.Sprintf("--datadir=%v", l.cfg.DataDir)) - args = append(args, fmt.Sprintf("--tlscertpath=%v", l.cfg.TLSCertPath)) - args = append(args, fmt.Sprintf("--tlskeypath=%v", l.cfg.TLSKeyPath)) - args = append(args, fmt.Sprintf("--configfile=%v", l.cfg.DataDir)) - args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", l.cfg.AdminMacPath)) - args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", l.cfg.ReadMacPath)) - args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) - - if l.extraArgs != nil { - args = append(args, l.extraArgs...) - } - - return args -} - // Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. func (l *lightningNode) Start(lndError chan<- error) error { - args := l.genArgs() - + args := l.cfg.genArgs() l.cmd = exec.Command("lnd", args...) // Redirect stderr output to buffer @@ -251,7 +266,7 @@ func (l *lightningNode) Start(lndError chan<- error) error { // writePidFile writes the process ID of the running lnd process to a .pid file. func (l *lightningNode) writePidFile() error { - filePath := filepath.Join(l.cfg.DataDir, fmt.Sprintf("%v.pid", l.nodeID)) + filePath := filepath.Join(l.cfg.BaseDir, fmt.Sprintf("%v.pid", l.nodeID)) pid, err := os.Create(filePath) if err != nil { @@ -307,19 +322,7 @@ func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { // cleanup cleans up all the temporary files created by the node's process. func (l *lightningNode) cleanup() error { - dirs := []string{ - l.cfg.LogDir, - l.cfg.DataDir, - } - - var err error - for _, dir := range dirs { - if removeErr := os.RemoveAll(dir); removeErr != nil { - log.Printf("Cannot remove dir %s: %v", dir, removeErr) - err = removeErr - } - } - return err + return os.RemoveAll(l.cfg.BaseDir) } // Stop attempts to stop the active lnd process. From 2febadd3e25949c056b329269c659835ec62b1aa Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 3 Nov 2017 11:52:02 -0700 Subject: [PATCH 3/7] lntest: Rename structs with proper visibility so lnd_test runs. --- lnd_test.go | 108 +++++++++++++------------ lntest/harness.go | 118 ++++++++++++++++----------- lntest/node.go | 201 ++++++++++++++++++++++++++++------------------ 3 files changed, 250 insertions(+), 177 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 72787a934ed..9a51779afeb 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -25,6 +25,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -36,6 +37,10 @@ import ( "google.golang.org/grpc" ) +var ( + harnessNetParams = &chaincfg.SimNetParams +) + // harnessTest wraps a regular testing.T providing enhanced error detection // and propagation. All error will be augmented with a full stack-trace in // order to aid in debugging. Additionally, any panics caused by active @@ -70,7 +75,9 @@ func (h *harnessTest) Fatalf(format string, a ...interface{}) { // RunTestCase executes a harness test case. Any errors or panics will be // represented as fatal. -func (h *harnessTest) RunTestCase(testCase *testCase, net *networkHarness) { +func (h *harnessTest) RunTestCase(testCase *testCase, + net *lntest.NetworkHarness) { + h.testCase = testCase defer func() { h.testCase = nil @@ -110,7 +117,9 @@ func assertTxInBlock(t *harnessTest, block *wire.MsgBlock, txid *chainhash.Hash) // mineBlocks mine 'num' of blocks and check that blocks are present in // node blockchain. -func mineBlocks(t *harnessTest, net *networkHarness, num uint32) []*wire.MsgBlock { +func mineBlocks(t *harnessTest, net *lntest.NetworkHarness, num uint32, +) []*wire.MsgBlock { + blocks := make([]*wire.MsgBlock, num) blockHashes, err := net.Miner.Node.Generate(num) @@ -135,9 +144,9 @@ func mineBlocks(t *harnessTest, net *networkHarness, num uint32) []*wire.MsgBloc // after the channel is considered open: the funding transaction should be // found within a block, and that Alice can report the status of the new // channel. -func openChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarness, - alice, bob *lightningNode, fundingAmt btcutil.Amount, - pushAmt btcutil.Amount) *lnrpc.ChannelPoint { +func openChannelAndAssert(ctx context.Context, t *harnessTest, + net *lntest.NetworkHarness, alice, bob *lntest.HarnessNode, + fundingAmt btcutil.Amount, pushAmt btcutil.Amount) *lnrpc.ChannelPoint { chanOpenUpdate, err := net.OpenChannel(ctx, alice, bob, fundingAmt, pushAmt) @@ -182,8 +191,9 @@ func openChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarne // via timeout from a base parent. Additionally, once the channel has been // detected as closed, an assertion checks that the transaction is found within // a block. -func closeChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarness, - node *lightningNode, fundingChanPoint *lnrpc.ChannelPoint, force bool) *chainhash.Hash { +func closeChannelAndAssert(ctx context.Context, t *harnessTest, + net *lntest.NetworkHarness, node *lntest.HarnessNode, + fundingChanPoint *lnrpc.ChannelPoint, force bool) *chainhash.Hash { closeUpdates, _, err := net.CloseChannel(ctx, node, fundingChanPoint, force) if err != nil { @@ -234,7 +244,7 @@ func closeChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarn // numOpenChannelsPending sends an RPC request to a node to get a count of the // node's channels that are currently in a pending state (with a broadcast, but // not confirmed funding transaction). -func numOpenChannelsPending(ctxt context.Context, node *lightningNode) (int, error) { +func numOpenChannelsPending(ctxt context.Context, node *lntest.HarnessNode) (int, error) { pendingChansRequest := &lnrpc.PendingChannelRequest{} resp, err := node.PendingChannels(ctxt, pendingChansRequest) if err != nil { @@ -246,7 +256,7 @@ func numOpenChannelsPending(ctxt context.Context, node *lightningNode) (int, err // assertNumOpenChannelsPending asserts that a pair of nodes have the expected // number of pending channels between them. func assertNumOpenChannelsPending(ctxt context.Context, t *harnessTest, - alice, bob *lightningNode, expected int) { + alice, bob *lntest.HarnessNode, expected int) { const nPolls = 10 @@ -257,12 +267,12 @@ func assertNumOpenChannelsPending(ctxt context.Context, t *harnessTest, aliceNumChans, err := numOpenChannelsPending(ctxt, alice) if err != nil { t.Fatalf("error fetching alice's node (%v) pending channels %v", - alice.nodeID, err) + alice.NodeID, err) } bobNumChans, err := numOpenChannelsPending(ctxt, bob) if err != nil { t.Fatalf("error fetching bob's node (%v) pending channels %v", - bob.nodeID, err) + bob.NodeID, err) } isLastIteration := i == nPolls-1 @@ -290,7 +300,7 @@ func assertNumOpenChannelsPending(ctxt context.Context, t *harnessTest, // assertNumConnections asserts number current connections between two peers. func assertNumConnections(ctxt context.Context, t *harnessTest, - alice, bob *lightningNode, expected int) { + alice, bob *lntest.HarnessNode, expected int) { const nPolls = 10 @@ -303,12 +313,12 @@ func assertNumConnections(ctxt context.Context, t *harnessTest, aNumPeers, err := alice.ListPeers(ctxt, &lnrpc.ListPeersRequest{}) if err != nil { t.Fatalf("unable to fetch alice's node (%v) list peers %v", - alice.nodeID, err) + alice.NodeID, err) } bNumPeers, err := bob.ListPeers(ctxt, &lnrpc.ListPeersRequest{}) if err != nil { t.Fatalf("unable to fetch bob's node (%v) list peers %v", - bob.nodeID, err) + bob.NodeID, err) } if len(aNumPeers.Peers) != expected { // Continue polling if this is not the final @@ -401,7 +411,7 @@ func completePaymentRequests(ctx context.Context, client lnrpc.LightningClient, // Bob, then immediately closes the channel after asserting some expected post // conditions. Finally, the chain itself is checked to ensure the closing // transaction was mined. -func testBasicChannelFunding(net *networkHarness, t *harnessTest) { +func testBasicChannelFunding(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -458,7 +468,7 @@ func testBasicChannelFunding(net *networkHarness, t *harnessTest) { // testOpenChannelAfterReorg tests that in the case where we have an open // channel where the funding tx gets reorged out, the channel will no // longer be present in the node's routing table. -func testOpenChannelAfterReorg(net *networkHarness, t *harnessTest) { +func testOpenChannelAfterReorg(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -639,7 +649,7 @@ func testOpenChannelAfterReorg(net *networkHarness, t *harnessTest) { // testDisconnectingTargetPeer performs a test which // disconnects Alice-peer from Bob-peer and then re-connects them again -func testDisconnectingTargetPeer(net *networkHarness, t *harnessTest) { +func testDisconnectingTargetPeer(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -768,7 +778,7 @@ func testDisconnectingTargetPeer(net *networkHarness, t *harnessTest) { // representation of channels if the system is restarted or disconnected. // testFundingPersistence mirrors testBasicChannelFunding, but adds restarts // and checks for the state of channels with unconfirmed funding transactions. -func testChannelFundingPersistence(net *networkHarness, t *harnessTest) { +func testChannelFundingPersistence(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() chanAmt := maxFundingAmount @@ -914,7 +924,7 @@ peersPoll: // testChannelBalance creates a new channel between Alice and Bob, then // checks channel balance to be equal amount specified while creation of channel. -func testChannelBalance(net *networkHarness, t *harnessTest) { +func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) // Open a channel with 0.16 BTC between Alice and Bob, ensuring the @@ -1069,8 +1079,7 @@ func assertPendingHtlcStageAndMaturity(t *harnessTest, // process. // // TODO(roasbeef): also add an unsettled HTLC before force closing. -func testChannelForceClosure(net *networkHarness, t *harnessTest) { - +func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() const ( timeout = time.Duration(time.Second * 10) @@ -1714,7 +1723,7 @@ func testChannelForceClosure(net *networkHarness, t *harnessTest) { } } -func testSingleHopInvoice(net *networkHarness, t *harnessTest) { +func testSingleHopInvoice(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -1851,7 +1860,7 @@ func testSingleHopInvoice(net *networkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) } -func testListPayments(net *networkHarness, t *harnessTest) { +func testListPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -1986,7 +1995,7 @@ func testListPayments(net *networkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) } -func testMultiHopPayments(net *networkHarness, t *harnessTest) { +func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) { const chanAmt = btcutil.Amount(100000) ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -2115,7 +2124,7 @@ func testMultiHopPayments(net *networkHarness, t *harnessTest) { // creating the seed nodes in the network. const baseFee = 1 - assertAmountPaid := func(node *lightningNode, chanPoint wire.OutPoint, + assertAmountPaid := func(node *lntest.HarnessNode, chanPoint wire.OutPoint, amountSent, amountReceived int64) { channelName := "" @@ -2220,7 +2229,7 @@ func testMultiHopPayments(net *networkHarness, t *harnessTest) { } } -func testInvoiceSubscriptions(net *networkHarness, t *harnessTest) { +func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) { const chanAmt = btcutil.Amount(500000) ctxb := context.Background() timeout := time.Duration(time.Second * 5) @@ -2313,7 +2322,7 @@ func testInvoiceSubscriptions(net *networkHarness, t *harnessTest) { } // testBasicChannelCreation test multiple channel opening and closing. -func testBasicChannelCreation(net *networkHarness, t *harnessTest) { +func testBasicChannelCreation(net *lntest.NetworkHarness, t *harnessTest) { const ( numChannels = 2 timeout = time.Duration(time.Second * 5) @@ -2340,7 +2349,7 @@ func testBasicChannelCreation(net *networkHarness, t *harnessTest) { // testMaxPendingChannels checks that error is returned from remote peer if // max pending channel number was exceeded and that '--maxpendingchannels' flag // exists and works properly. -func testMaxPendingChannels(net *networkHarness, t *harnessTest) { +func testMaxPendingChannels(net *lntest.NetworkHarness, t *harnessTest) { maxPendingChannels := defaultMaxPendingChannels + 1 amount := maxFundingAmount @@ -2534,7 +2543,7 @@ func waitForNTxsInMempool(miner *rpcclient.Client, n int, // testRevokedCloseRetributinPostBreachConf tests that Alice is able carry out // retribution in the event that she fails immediately after detecting Bob's // breach txn in the mempool. -func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { +func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() const ( timeout = time.Duration(time.Second * 10) @@ -2630,8 +2639,7 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { // With the temporary file created, copy Bob's current state into the // temporary file we created above. Later after more updates, we'll // restore this state. - bobDbPath := filepath.Join(net.Bob.cfg.DataDir, "simnet/bitcoin/channel.db") - if err := copyFile(bobTempDbFile, bobDbPath); err != nil { + if err := copyFile(bobTempDbFile, net.Bob.DBPath()); err != nil { t.Fatalf("unable to copy database files: %v", err) } @@ -2654,7 +2662,7 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { // state. With this, we essentially force Bob to travel back in time // within the channel's history. if err = net.RestartNode(net.Bob, func() error { - return os.Rename(bobTempDbFile, bobDbPath) + return os.Rename(bobTempDbFile, net.Bob.DBPath()) }); err != nil { t.Fatalf("unable to restart node: %v", err) } @@ -2769,8 +2777,7 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { // testRevokedCloseRetributionZeroValueRemoteOutput tests that Alice is able // carry out retribution in the event that she fails in state where the remote // commitment output has zero-value. -func testRevokedCloseRetributionZeroValueRemoteOutput( - net *networkHarness, +func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -2871,8 +2878,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput( // With the temporary file created, copy Carol's current state into the // temporary file we created above. Later after more updates, we'll // restore this state. - carolDbPath := filepath.Join(carol.cfg.DataDir, "simnet/bitcoin/channel.db") - if err := copyFile(carolTempDbFile, carolDbPath); err != nil { + if err := copyFile(carolTempDbFile, carol.DBPath()); err != nil { t.Fatalf("unable to copy database files: %v", err) } @@ -2893,7 +2899,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput( // state. With this, we essentially force Carol to travel back in time // within the channel's history. if err = net.RestartNode(carol, func() error { - return os.Rename(carolTempDbFile, carolDbPath) + return os.Rename(carolTempDbFile, carol.DBPath()) }); err != nil { t.Fatalf("unable to restart node: %v", err) } @@ -3000,8 +3006,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput( // testRevokedCloseRetributionRemoteHodl tests that Alice properly responds to a // channel breach made by the remote party, specifically in the case that the // remote party breaches before settling extended HTLCs. -func testRevokedCloseRetributionRemoteHodl( - net *networkHarness, +func testRevokedCloseRetributionRemoteHodl(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -3144,8 +3149,7 @@ func testRevokedCloseRetributionRemoteHodl( // With the temporary file created, copy Carol's current state into the // temporary file we created above. Later after more updates, we'll // restore this state. - carolDbPath := filepath.Join(carol.cfg.DataDir, "simnet/bitcoin/channel.db") - if err := copyFile(carolTempDbFile, carolDbPath); err != nil { + if err := copyFile(carolTempDbFile, carol.DBPath()); err != nil { t.Fatalf("unable to copy database files: %v", err) } @@ -3167,7 +3171,7 @@ func testRevokedCloseRetributionRemoteHodl( // state. With this, we essentially force Carol to travel back in time // within the channel's history. if err = net.RestartNode(carol, func() error { - return os.Rename(carolTempDbFile, carolDbPath) + return os.Rename(carolTempDbFile, carol.DBPath()) }); err != nil { t.Fatalf("unable to restart node: %v", err) } @@ -3296,7 +3300,7 @@ func testRevokedCloseRetributionRemoteHodl( } } -func testHtlcErrorPropagation(net *networkHarness, t *harnessTest) { +func testHtlcErrorPropagation(net *lntest.NetworkHarness, t *harnessTest) { // In this test we wish to exercise the daemon's correct parsing, // handling, and propagation of errors that occur while processing a // multi-hop payment. @@ -3582,7 +3586,7 @@ out: } } -func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { +func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { const chanAmt = maxFundingAmount timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -3808,7 +3812,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { // testNodeAnnouncement ensures that when a node is started with one or more // external IP addresses specified on the command line, that those addresses // announced to the network and reported in the network graph. -func testNodeAnnouncement(net *networkHarness, t *harnessTest) { +func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() ipAddresses := map[string]bool{ @@ -3883,7 +3887,7 @@ func testNodeAnnouncement(net *networkHarness, t *harnessTest) { } } -func testNodeSignVerify(net *networkHarness, t *harnessTest) { +func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) { timeout := time.Duration(time.Second * 5) ctxb := context.Background() @@ -3961,12 +3965,12 @@ func testNodeSignVerify(net *networkHarness, t *harnessTest) { // testAsyncPayments tests the performance of the async payments, and also // checks that balances of both sides can't be become negative under stress // payment strikes. -func testAsyncPayments(net *networkHarness, t *harnessTest) { +func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() // As we'll be querying the channels state frequently we'll // create a closure helper function for the purpose. - getChanInfo := func(node *lightningNode) (*lnrpc.ActiveChannel, error) { + getChanInfo := func(node *lntest.HarnessNode) (*lnrpc.ActiveChannel, error) { req := &lnrpc.ListChannelsRequest{} channelInfo, err := node.ListChannels(ctxb, req) if err != nil { @@ -4138,12 +4142,12 @@ func testAsyncPayments(net *networkHarness, t *harnessTest) { // testBidirectionalAsyncPayments tests that nodes are able to send the // payments to each other in async manner without blocking. -func testBidirectionalAsyncPayments(net *networkHarness, t *harnessTest) { +func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() // As we'll be querying the channels state frequently we'll // create a closure helper function for the purpose. - getChanInfo := func(node *lightningNode) (*lnrpc.ActiveChannel, error) { + getChanInfo := func(node *lntest.HarnessNode) (*lnrpc.ActiveChannel, error) { req := &lnrpc.ListChannelsRequest{} channelInfo, err := node.ListChannels(ctxb, req) if err != nil { @@ -4375,7 +4379,7 @@ func testBidirectionalAsyncPayments(net *networkHarness, t *harnessTest) { type testCase struct { name string - test func(net *networkHarness, t *harnessTest) + test func(net *lntest.NetworkHarness, t *harnessTest) } var testsCases = []*testCase{ @@ -4475,7 +4479,7 @@ func TestLightningNetworkDaemon(t *testing.T) { // First create the network harness to gain access to its // 'OnTxAccepted' call back. - lndHarness, err := newNetworkHarness() + lndHarness, err := lntest.NewNetworkHarness() if err != nil { ht.Fatalf("unable to create lightning network harness: %v", err) } diff --git a/lntest/harness.go b/lntest/harness.go index 6222e04c80f..93eb38ed22d 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -1,19 +1,41 @@ package lntest -// networkHarness is an integration testing harness for the lightning network. +import ( + "fmt" + "io/ioutil" + "sync" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" + + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/integration/rpctest" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/txscript" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +// NetworkHarness is an integration testing harness for the lightning network. // The harness by default is created with two active nodes on the network: // Alice and Bob. -type networkHarness struct { +type NetworkHarness struct { rpcConfig rpcclient.ConnConfig netParams *chaincfg.Params - Miner *rpctest.Harness - activeNodes map[int]*lightningNode + // Miner is a reference to a running full node that can be used to create + // new blocks on the network. + Miner *rpctest.Harness + + activeNodes map[int]*HarnessNode // Alice and Bob are the initial seeder nodes that are automatically // created to be the initial participants of the test network. - Alice *lightningNode - Bob *lightningNode + Alice *HarnessNode + Bob *HarnessNode seenTxns chan chainhash.Hash bitcoinWatchRequests chan *txWatchRequest @@ -27,13 +49,13 @@ type networkHarness struct { sync.Mutex } -// newNetworkHarness creates a new network test harness. +// NewNetworkHarness creates a new network test harness. // TODO(roasbeef): add option to use golang's build library to a binary of the // current repo. This'll save developers from having to manually `go install` // within the repo each time before changes -func newNetworkHarness() (*networkHarness, error) { - return &networkHarness{ - activeNodes: make(map[int]*lightningNode), +func NewNetworkHarness() (*NetworkHarness, error) { + return &NetworkHarness{ + activeNodes: make(map[int]*HarnessNode), seenTxns: make(chan chainhash.Hash), bitcoinWatchRequests: make(chan *txWatchRequest), lndErrorChan: make(chan error), @@ -44,10 +66,10 @@ func newNetworkHarness() (*networkHarness, error) { // InitializeSeedNodes initializes alice and bob nodes given an already // running instance of btcd's rpctest harness and extra command line flags, // which should be formatted properly - "--arg=value". -func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { +func (n *NetworkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { n.netParams = r.ActiveNet n.Miner = r - n.rpcConfig = nodeConfig + n.rpcConfig = r.RPCConfig() config := nodeConfig{ RPCConfig: &n.rpcConfig, @@ -56,17 +78,17 @@ func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []strin } var err error - n.Alice, err = newLightningNode(config) + n.Alice, err = newNode(config) if err != nil { return err } - n.Bob, err = newLightningNode(config) + n.Bob, err = newNode(config) if err != nil { return err } - n.activeNodes[n.Alice.nodeID] = n.Alice - n.activeNodes[n.Bob.nodeID] = n.Bob + n.activeNodes[n.Alice.NodeID] = n.Alice + n.activeNodes[n.Bob.NodeID] = n.Bob return err } @@ -74,7 +96,7 @@ func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []strin // ProcessErrors returns a channel used for reporting any fatal process errors. // If any of the active nodes within the harness' test network incur a fatal // error, that error is sent over this channel. -func (n *networkHarness) ProcessErrors() <-chan error { +func (n *NetworkHarness) ProcessErrors() <-chan error { return n.lndErrorChan } @@ -93,7 +115,7 @@ func (f *fakeLogger) Println(args ...interface{}) {} // node's wallets will be funded wallets with ten 1 BTC outputs each. Finally // rpc clients capable of communicating with the initial seeder nodes are // created. -func (n *networkHarness) SetUp() error { +func (n *NetworkHarness) SetUp() error { // Swap out grpc's default logger with out fake logger which drops the // statements on the floor. grpclog.SetLogger(&fakeLogger{}) @@ -105,13 +127,13 @@ func (n *networkHarness) SetUp() error { wg.Add(2) go func() { defer wg.Done() - if err := n.Alice.Start(n.lndErrorChan); err != nil { + if err := n.Alice.start(n.lndErrorChan); err != nil { errChan <- err } }() go func() { defer wg.Done() - if err := n.Bob.Start(n.lndErrorChan); err != nil { + if err := n.Bob.start(n.lndErrorChan); err != nil { errChan <- err } }() @@ -200,7 +222,7 @@ out: } // TearDownAll tears down all active nodes within the test lightning network. -func (n *networkHarness) TearDownAll() error { +func (n *NetworkHarness) TearDownAll() error { for _, node := range n.activeNodes { if err := node.Shutdown(); err != nil { return err @@ -213,14 +235,14 @@ func (n *networkHarness) TearDownAll() error { return nil } -// NewNode fully initializes a returns a new lightningNode binded to the +// NewNode fully initializes a returns a new HarnessNode binded to the // current instance of the network harness. The created node is running, but // not yet connected to other nodes within the network. -func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { +func (n *NetworkHarness) NewNode(extraArgs []string) (*HarnessNode, error) { n.Lock() defer n.Unlock() - node, err := newLightningNode(nodeConfig{ + node, err := newNode(nodeConfig{ RPCConfig: &n.rpcConfig, NetParams: n.netParams, ExtraArgs: extraArgs, @@ -231,9 +253,9 @@ func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { // Put node in activeNodes to ensure Shutdown is called even if Start // returns an error. - n.activeNodes[node.nodeID] = node + n.activeNodes[node.NodeID] = node - if err := node.Start(n.lndErrorChan); err != nil { + if err := node.start(n.lndErrorChan); err != nil { return nil, err } @@ -246,7 +268,7 @@ func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { // // NOTE: This function may block for up to 15-seconds as it will not return // until the new connection is detected as being known to both nodes. -func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) error { +func (n *NetworkHarness) ConnectNodes(ctx context.Context, a, b *HarnessNode) error { bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return err @@ -288,7 +310,7 @@ func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) // DisconnectNodes disconnects node a from node b by sending RPC message // from a node to b node -func (n *networkHarness) DisconnectNodes(ctx context.Context, a, b *lightningNode) error { +func (n *NetworkHarness) DisconnectNodes(ctx context.Context, a, b *HarnessNode) error { bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return err @@ -315,8 +337,8 @@ func (n *networkHarness) DisconnectNodes(ctx context.Context, a, b *lightningNod // This method can be useful when testing edge cases such as a node broadcast // and invalidated prior state, or persistent state recovery, simulating node // crashes, etc. -func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error { - return node.Restart(n.lndErrorChan, callback) +func (n *NetworkHarness) RestartNode(node *HarnessNode, callback func() error) error { + return node.restart(n.lndErrorChan, callback) } // TODO(roasbeef): add a WithChannel higher-order function? @@ -335,7 +357,7 @@ type txWatchRequest struct { // bitcoinNetworkWatcher is a goroutine which accepts async notification // requests for the broadcast of a target transaction, and then dispatches the // transaction once its seen on the Bitcoin network. -func (n *networkHarness) networkWatcher() { +func (n *NetworkHarness) networkWatcher() { seenTxns := make(map[chainhash.Hash]struct{}) clients := make(map[chainhash.Hash][]chan struct{}) @@ -381,7 +403,7 @@ func (n *networkHarness) networkWatcher() { // OnTxAccepted is a callback to be called each time a new transaction has been // broadcast on the network. -func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) { +func (n *NetworkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) { // Return immediately if harness has been torn down. select { case <-n.quit: @@ -398,11 +420,11 @@ func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) // the transaction isn't seen within the network before the passed timeout, // then an error is returned. // TODO(roasbeef): add another method which creates queue of all seen transactions -func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error { +func (n *NetworkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error { // Return immediately if harness has been torn down. select { case <-n.quit: - return fmt.Errorf("networkHarness has been torn down") + return fmt.Errorf("NetworkHarness has been torn down") default: } @@ -417,7 +439,7 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash. case <-eventChan: return nil case <-n.quit: - return fmt.Errorf("networkHarness has been torn down") + return fmt.Errorf("NetworkHarness has been torn down") case <-ctx.Done(): return fmt.Errorf("tx not seen before context timeout") } @@ -427,8 +449,8 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash. // passed channel funding parameters. If the passed context has a timeout, then // if the timeout is reached before the channel pending notification is // received, an error is returned. -func (n *networkHarness) OpenChannel(ctx context.Context, - srcNode, destNode *lightningNode, amt btcutil.Amount, +func (n *NetworkHarness) OpenChannel(ctx context.Context, + srcNode, destNode *HarnessNode, amt btcutil.Amount, pushAmt btcutil.Amount) (lnrpc.Lightning_OpenChannelClient, error) { // Wait until srcNode and destNode have the latest chain synced. @@ -489,8 +511,8 @@ func (n *networkHarness) OpenChannel(ctx context.Context, // passed channel funding parameters. If the passed context has a timeout, then // if the timeout is reached before the channel pending notification is // received, an error is returned. -func (n *networkHarness) OpenPendingChannel(ctx context.Context, - srcNode, destNode *lightningNode, amt btcutil.Amount, +func (n *NetworkHarness) OpenPendingChannel(ctx context.Context, + srcNode, destNode *HarnessNode, amt btcutil.Amount, pushAmt btcutil.Amount) (*lnrpc.PendingUpdate, error) { // Wait until srcNode and destNode have blockchain synced @@ -549,7 +571,7 @@ func (n *networkHarness) OpenPendingChannel(ctx context.Context, // consuming a message from the past open channel stream. If the passed context // has a timeout, then if the timeout is reached before the channel has been // opened, then an error is returned. -func (n *networkHarness) WaitForChannelOpen(ctx context.Context, +func (n *NetworkHarness) WaitForChannelOpen(ctx context.Context, openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { errChan := make(chan error) @@ -585,8 +607,8 @@ func (n *networkHarness) WaitForChannelOpen(ctx context.Context, // passed channel point, initiated by the passed lnNode. If the passed context // has a timeout, then if the timeout is reached before the channel close is // pending, then an error is returned. -func (n *networkHarness) CloseChannel(ctx context.Context, - lnNode *lightningNode, cp *lnrpc.ChannelPoint, +func (n *NetworkHarness) CloseChannel(ctx context.Context, + lnNode *HarnessNode, cp *lnrpc.ChannelPoint, force bool) (lnrpc.Lightning_CloseChannelClient, *chainhash.Hash, error) { // Create a channel outpoint that we can use to compare to channels @@ -684,7 +706,7 @@ CheckActive: // stream that the node has deemed the channel has been fully closed. If the // passed context has a timeout, then if the timeout is reached before the // notification is received then an error is returned. -func (n *networkHarness) WaitForChannelClose(ctx context.Context, +func (n *NetworkHarness) WaitForChannelClose(ctx context.Context, closeChanStream lnrpc.Lightning_CloseChannelClient) (*chainhash.Hash, error) { errChan := make(chan error) @@ -720,8 +742,8 @@ func (n *networkHarness) WaitForChannelClose(ctx context.Context, // AssertChannelExists asserts that an active channel identified by // channelPoint is known to exist from the point-of-view of node.. -func (n *networkHarness) AssertChannelExists(ctx context.Context, - node *lightningNode, chanPoint *wire.OutPoint) error { +func (n *NetworkHarness) AssertChannelExists(ctx context.Context, + node *HarnessNode, chanPoint *wire.OutPoint) error { req := &lnrpc.ListChannelsRequest{} resp, err := node.ListChannels(ctx, req) @@ -743,7 +765,7 @@ func (n *networkHarness) AssertChannelExists(ctx context.Context, // of a particular node in the case of a test failure. // Logs from lightning node being generated with delay - you should // add time.Sleep() in order to get all logs. -func (n *networkHarness) DumpLogs(node *lightningNode) (string, error) { +func (n *NetworkHarness) DumpLogs(node *HarnessNode) (string, error) { logFile := fmt.Sprintf("%v/simnet/lnd.log", node.cfg.LogDir) buf, err := ioutil.ReadFile(logFile) @@ -756,8 +778,8 @@ func (n *networkHarness) DumpLogs(node *lightningNode) (string, error) { // SendCoins attempts to send amt satoshis from the internal mining node to the // targeted lightning node. -func (n *networkHarness) SendCoins(ctx context.Context, amt btcutil.Amount, - target *lightningNode) error { +func (n *NetworkHarness) SendCoins(ctx context.Context, amt btcutil.Amount, + target *HarnessNode) error { balReq := &lnrpc.WalletBalanceRequest{} initialBalance, err := target.WalletBalance(ctx, balReq) diff --git a/lntest/node.go b/lntest/node.go index 738801761e5..ff4e2e283d1 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -1,5 +1,34 @@ package lntest +import ( + "bytes" + "encoding/hex" + "flag" + "fmt" + "io" + "io/ioutil" + "net" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + macaroon "gopkg.in/macaroon.v1" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/macaroons" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/wire" +) + var ( // numActiveNodes is the number of active nodes within the test network. numActiveNodes = 0 @@ -18,8 +47,6 @@ var ( // as such: defaultP2pPort + (2 * harness.nodeNum). defaultClientPort = 19556 - harnessNetParams = &chaincfg.SimNetParams - // logOutput is a flag that can be set to append the output from the // seed nodes to log files. logOutput = flag.Bool("logoutput", false, @@ -119,13 +146,14 @@ func (cfg nodeConfig) genArgs() []string { return args } -// lightningNode represents an instance of lnd running within our test network -// harness. Each lightningNode instance also fully embedds an RPC client in +// HarnessNode represents an instance of lnd running within our test network +// harness. Each HarnessNode instance also fully embedds an RPC client in // order to pragmatically drive the node. -type lightningNode struct { +type HarnessNode struct { cfg *nodeConfig - nodeID int + // NodeID is a unique identifier for the node within a NetworkHarness. + NodeID int // PubKey is the serialized compressed identity public key of the node. // This field will only be populated once the node itself has been @@ -137,7 +165,7 @@ type lightningNode struct { pidFile string // processExit is a channel that's closed once it's detected that the - // process this instance of lightningNode is bound to has exited. + // process this instance of HarnessNode is bound to has exited. processExit chan struct{} chanWatchRequests chan *chanWatchRequest @@ -148,9 +176,11 @@ type lightningNode struct { lnrpc.LightningClient } -// newLightningNode creates a new test lightning node instance from the passed -// rpc config and slice of extra arguments. -func newLightningNode(cfg nodeConfig) (*lightningNode, error) { +// Assert *HarnessNode implements the lnrpc.LightningClient interface. +var _ lnrpc.LightningClient = (*HarnessNode)(nil) + +// newNode creates a new test lightning node instance from the passed config. +func newNode(cfg nodeConfig) (*HarnessNode, error) { if cfg.BaseDir == "" { var err error cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node") @@ -170,30 +200,35 @@ func newLightningNode(cfg nodeConfig) (*lightningNode, error) { nodeNum := numActiveNodes numActiveNodes++ - return &lightningNode{ + return &HarnessNode{ cfg: &cfg, - nodeID: nodeNum, + NodeID: nodeNum, chanWatchRequests: make(chan *chanWatchRequest), processExit: make(chan struct{}), quit: make(chan struct{}), }, nil } +// DBPath returns the filepath to the channeldb database file for this node. +func (hn *HarnessNode) DBPath() string { + return hn.cfg.DBPath() +} + // Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. -func (l *lightningNode) Start(lndError chan<- error) error { - args := l.cfg.genArgs() - l.cmd = exec.Command("lnd", args...) +func (hn *HarnessNode) start(lndError chan<- error) error { + args := hn.cfg.genArgs() + hn.cmd = exec.Command("lnd", args...) // Redirect stderr output to buffer var errb bytes.Buffer - l.cmd.Stderr = &errb + hn.cmd.Stderr = &errb // If the logoutput flag is passed, redirect output from the nodes to // log files. if *logOutput { - logFile := fmt.Sprintf("output%d.log", l.nodeID) + logFile := fmt.Sprintf("output%d.log", hn.NodeID) // Create file if not exists, otherwise append. file, err := os.OpenFile(logFile, @@ -204,69 +239,70 @@ func (l *lightningNode) Start(lndError chan<- error) error { // Pass node's stderr to both errb and the file. w := io.MultiWriter(&errb, file) - l.cmd.Stderr = w + hn.cmd.Stderr = w // Pass the node's stdout only to the file. - l.cmd.Stdout = file + hn.cmd.Stdout = file } - if err := l.cmd.Start(); err != nil { + if err := hn.cmd.Start(); err != nil { return err } // Launch a new goroutine which that bubbles up any potential fatal // process errors to the goroutine running the tests. go func() { - err := l.cmd.Wait() + err := hn.cmd.Wait() + if err != nil { lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) } // Signal any onlookers that this process has exited. - close(l.processExit) + close(hn.processExit) }() // Write process ID to a file. - if err := l.writePidFile(); err != nil { - l.cmd.Process.Kill() + if err := hn.writePidFile(); err != nil { + hn.cmd.Process.Kill() return err } // Since Stop uses the LightningClient to stop the node, if we fail to get a // connected client, we have to kill the process. - conn, err := l.connectRPC() + conn, err := hn.connectRPC() if err != nil { - l.cmd.Process.Kill() + hn.cmd.Process.Kill() return err } - l.LightningClient = lnrpc.NewLightningClient(conn) + hn.LightningClient = lnrpc.NewLightningClient(conn) // Obtain the lnid of this node for quick identification purposes. ctxb := context.Background() - info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) + info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) if err != nil { return err } - l.PubKeyStr = info.IdentityPubkey + hn.PubKeyStr = info.IdentityPubkey pubkey, err := hex.DecodeString(info.IdentityPubkey) if err != nil { return err } - copy(l.PubKey[:], pubkey) + copy(hn.PubKey[:], pubkey) // Launch the watcher that'll hook into graph related topology change // from the PoV of this node. - l.wg.Add(1) - go l.lightningNetworkWatcher() + hn.wg.Add(1) + go hn.lightningNetworkWatcher() return nil } // writePidFile writes the process ID of the running lnd process to a .pid file. -func (l *lightningNode) writePidFile() error { - filePath := filepath.Join(l.cfg.BaseDir, fmt.Sprintf("%v.pid", l.nodeID)) +func (hn *HarnessNode) writePidFile() error { + filePath := filepath.Join(hn.cfg.BaseDir, fmt.Sprintf("%v.pid", hn.NodeID)) pid, err := os.Create(filePath) if err != nil { @@ -274,22 +310,22 @@ func (l *lightningNode) writePidFile() error { } defer pid.Close() - _, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid) + _, err = fmt.Fprintf(pid, "%v\n", hn.cmd.Process.Pid) if err != nil { return err } - l.pidFile = filePath + hn.pidFile = filePath return nil } // connectRPC uses the TLS certificate and admin macaroon files written by the // lnd node to create a gRPC client connection. -func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { +func (hn *HarnessNode) connectRPC() (*grpc.ClientConn, error) { // Wait until TLS certificate and admin macaroon are created before // using them, up to 20 sec. tlsTimeout := time.After(30 * time.Second) - for !fileExists(l.cfg.TLSCertPath) || !fileExists(l.cfg.AdminMacPath) { + for !fileExists(hn.cfg.TLSCertPath) || !fileExists(hn.cfg.AdminMacPath) { select { case <-tlsTimeout: return nil, fmt.Errorf("timeout waiting for TLS cert file " + @@ -299,11 +335,11 @@ func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { } } - tlsCreds, err := credentials.NewClientTLSFromFile(l.cfg.TLSCertPath, "") + tlsCreds, err := credentials.NewClientTLSFromFile(hn.cfg.TLSCertPath, "") if err != nil { return nil, err } - macBytes, err := ioutil.ReadFile(l.cfg.AdminMacPath) + macBytes, err := ioutil.ReadFile(hn.cfg.AdminMacPath) if err != nil { return nil, err } @@ -317,26 +353,26 @@ func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) { grpc.WithBlock(), grpc.WithTimeout(time.Second * 20), } - return grpc.Dial(l.rpcAddr, opts...) + return grpc.Dial(hn.cfg.RPCAddr(), opts...) } // cleanup cleans up all the temporary files created by the node's process. -func (l *lightningNode) cleanup() error { - return os.RemoveAll(l.cfg.BaseDir) +func (hn *HarnessNode) cleanup() error { + return os.RemoveAll(hn.cfg.BaseDir) } // Stop attempts to stop the active lnd process. -func (l *lightningNode) Stop() error { +func (hn *HarnessNode) stop() error { // Do nothing if the process never started successfully. - if l.LightningClient == nil { + if hn.LightningClient == nil { return nil } // Do nothing if the process already finished. select { - case <-l.quit: + case <-hn.quit: return nil - case <-l.processExit: + case <-hn.processExit: return nil default: } @@ -345,10 +381,10 @@ func (l *lightningNode) Stop() error { // closed before a response is returned. req := lnrpc.StopRequest{} ctx := context.Background() - l.LightningClient.StopDaemon(ctx, &req) + hn.LightningClient.StopDaemon(ctx, &req) - close(l.quit) - l.wg.Wait() + close(hn.quit) + hn.wg.Wait() return nil } @@ -358,17 +394,17 @@ func (l *lightningNode) Stop() error { // connection attempt is successful. Additionally, if a callback is passed, the // closure will be executed after the node has been shutdown, but before the // process has been started up again. -func (l *lightningNode) Restart(errChan chan error, callback func() error) error { - if err := l.Stop(); err != nil { +func (hn *HarnessNode) restart(errChan chan error, callback func() error) error { + if err := hn.stop(); err != nil { return err } - <-l.processExit + <-hn.processExit - l.LightningClient = nil - l.processExit = make(chan struct{}) - l.quit = make(chan struct{}) - l.wg = sync.WaitGroup{} + hn.LightningClient = nil + hn.processExit = make(chan struct{}) + hn.quit = make(chan struct{}) + hn.wg = sync.WaitGroup{} if callback != nil { if err := callback(); err != nil { @@ -376,16 +412,16 @@ func (l *lightningNode) Restart(errChan chan error, callback func() error) error } } - return l.Start(errChan) + return hn.start(errChan) } // Shutdown stops the active lnd process and clean up any temporary directories // created along the way. -func (l *lightningNode) Shutdown() error { - if err := l.Stop(); err != nil { +func (hn *HarnessNode) Shutdown() error { + if err := hn.stop(); err != nil { return err } - if err := l.cleanup(); err != nil { + if err := hn.cleanup(); err != nil { return err } return nil @@ -407,17 +443,17 @@ type chanWatchRequest struct { // closed or opened within the network. In order to dispatch these // notifications, the GraphTopologySubscription client exposed as part of the // gRPC interface is used. -func (l *lightningNode) lightningNetworkWatcher() { - defer l.wg.Done() +func (hn *HarnessNode) lightningNetworkWatcher() { + defer hn.wg.Done() graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) - l.wg.Add(1) + hn.wg.Add(1) go func() { - defer l.wg.Done() + defer hn.wg.Done() ctxb := context.Background() req := &lnrpc.GraphTopologySubscription{} - topologyClient, err := l.SubscribeChannelGraph(ctxb, req) + topologyClient, err := hn.SubscribeChannelGraph(ctxb, req) if err != nil { // We panic here in case of an error as failure to // create the topology client will cause all subsequent @@ -436,7 +472,7 @@ func (l *lightningNode) lightningNetworkWatcher() { select { case graphUpdates <- update: - case <-l.quit: + case <-hn.quit: return } } @@ -506,7 +542,7 @@ func (l *lightningNode) lightningNetworkWatcher() { // A new watch request, has just arrived. We'll either be able // to dispatch immediately, or need to add the client for // processing later. - case watchRequest := <-l.chanWatchRequests: + case watchRequest := <-hn.chanWatchRequests: targetChan := watchRequest.chanPoint // TODO(roasbeef): add update type also, checks for @@ -540,7 +576,7 @@ func (l *lightningNode) lightningNetworkWatcher() { closeClients[targetChan] = append(closeClients[targetChan], watchRequest.eventChan) - case <-l.quit: + case <-hn.quit: return } } @@ -550,7 +586,7 @@ func (l *lightningNode) lightningNetworkWatcher() { // outpoint is seen as being fully advertised within the network. A channel is // considered "fully advertised" once both of its directional edges has been // advertised within the test Lightning Network. -func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, +func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, op *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) @@ -560,7 +596,7 @@ func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, return err } - l.chanWatchRequests <- &chanWatchRequest{ + hn.chanWatchRequests <- &chanWatchRequest{ chanPoint: wire.OutPoint{ Hash: *txid, Index: op.OutputIndex, @@ -581,7 +617,7 @@ func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, // outpoint is seen as closed within the network. A channel is considered // closed once a transaction spending the funding outpoint is seen within a // confirmed block. -func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, +func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, op *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) @@ -591,7 +627,7 @@ func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, return err } - l.chanWatchRequests <- &chanWatchRequest{ + hn.chanWatchRequests <- &chanWatchRequest{ chanPoint: wire.OutPoint{ Hash: *txid, Index: op.OutputIndex, @@ -613,7 +649,7 @@ func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, // timeout, then the goroutine will continually poll until the timeout has // elapsed. In the case that the chain isn't synced before the timeout is up, // then this function will return an error. -func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { +func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error { errChan := make(chan error, 1) retryDelay := time.Millisecond * 100 @@ -621,13 +657,13 @@ func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { for { select { case <-ctx.Done(): - case <-l.quit: + case <-hn.quit: return default: } getInfoReq := &lnrpc.GetInfoRequest{} - getInfoResp, err := l.GetInfo(ctx, getInfoReq) + getInfoResp, err := hn.GetInfo(ctx, getInfoReq) if err != nil { errChan <- err return @@ -646,7 +682,7 @@ func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { }() select { - case <-l.quit: + case <-hn.quit: return nil case err := <-errChan: return err @@ -654,3 +690,14 @@ func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { return fmt.Errorf("Timeout while waiting for blockchain sync") } } + +// fileExists reports whether the named file or directory exists. +// This function is taken from https://github.com/btcsuite/btcd +func fileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} From b995c5f95fe5b9a844b758ab45116bd56d0d41e0 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 3 Nov 2017 14:23:07 -0700 Subject: [PATCH 4/7] lntest: Add doc.go with package documentation. --- lntest/doc.go | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 lntest/doc.go diff --git a/lntest/doc.go b/lntest/doc.go new file mode 100644 index 00000000000..38ef4089346 --- /dev/null +++ b/lntest/doc.go @@ -0,0 +1,10 @@ +/* +Package lntest provides testing utilities for the lnd repository. + +This package contains infrastructure for integration tests that launch full lnd +nodes in a controlled environment and interact with them via RPC. Using a +NetworkHarness, a test can launch multiple lnd nodes, open channels between +them, create defined network topologies, and anything else that is possible with +RPC commands. +*/ +package lntest From d6c2a33d0fd772ee30753d4cfcf40483a3b2756c Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 3 Nov 2017 14:40:57 -0700 Subject: [PATCH 5/7] lntest: Add ShutdownNode method to harness. This is preferable to calling Shutdown on the node directly so that the harness manages the entire lifecycle of an lnd process. --- lnd_test.go | 22 +++++++++++----------- lntest/harness.go | 13 ++++++++++++- lntest/node.go | 4 ++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 9a51779afeb..760c38a40d1 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -917,7 +917,7 @@ peersPoll: closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) // Clean up carol's node. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } } @@ -2218,13 +2218,13 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxt, _ = context.WithTimeout(ctxb, timeout) closeChannelAndAssert(ctxt, t, net, carol, chanPointCarol, false) - // Finally, shutdown the nodes we created for the duration of the - // tests, only leaving the two seed nodes (Alice and Bob) within our - // test network. - if err := carol.Shutdown(); err != nil { + // Finally, shutdown the nodes we created for the duration of the tests, + // only leaving the two seed nodes (Alice and Bob) within our test + // network. + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } - if err := dave.Shutdown(); err != nil { + if err := net.ShutdownNode(dave); err != nil { t.Fatalf("unable to shutdown dave: %v", err) } } @@ -2456,7 +2456,7 @@ func testMaxPendingChannels(net *lntest.NetworkHarness, t *harnessTest) { // Finally, shutdown the node we created for the duration of the tests, // only leaving the two seed nodes (Alice and Bob) within our test // network. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } } @@ -3546,7 +3546,7 @@ out: // We'll attempt to complete the original invoice we created with Carol // above, but before we do so, Carol will go offline, resulting in a // failed payment. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } // TODO(roasbeef): mission control @@ -3804,7 +3804,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) close(quit) // Finally, shutdown carol as our test has concluded successfully. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } } @@ -3882,7 +3882,7 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { ctxt, _ = context.WithTimeout(ctxb, timeout) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false) - if err := dave.Shutdown(); err != nil { + if err := net.ShutdownNode(dave); err != nil { t.Fatalf("unable to shutdown dave: %v", err) } } @@ -3953,7 +3953,7 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) { } // Clean up carol's node. - if err := carol.Shutdown(); err != nil { + if err := net.ShutdownNode(carol); err != nil { t.Fatalf("unable to shutdown carol: %v", err) } diff --git a/lntest/harness.go b/lntest/harness.go index 93eb38ed22d..f681b70ebfa 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -224,7 +224,7 @@ out: // TearDownAll tears down all active nodes within the test lightning network. func (n *NetworkHarness) TearDownAll() error { for _, node := range n.activeNodes { - if err := node.Shutdown(); err != nil { + if err := n.ShutdownNode(node); err != nil { return err } } @@ -341,6 +341,17 @@ func (n *NetworkHarness) RestartNode(node *HarnessNode, callback func() error) e return node.restart(n.lndErrorChan, callback) } +// ShutdownNode stops an active lnd process and returns when the process has +// exited and any temporary directories have been cleaned up. +func (n *NetworkHarness) ShutdownNode(node *HarnessNode) error { + if err := node.shutdown(); err != nil { + return err + } + + delete(n.activeNodes, node.NodeID) + return nil +} + // TODO(roasbeef): add a WithChannel higher-order function? // * python-like context manager w.r.t using a channel within a test // * possibly adds more funds to the target wallet if the funds are not diff --git a/lntest/node.go b/lntest/node.go index ff4e2e283d1..85422315e5a 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -415,9 +415,9 @@ func (hn *HarnessNode) restart(errChan chan error, callback func() error) error return hn.start(errChan) } -// Shutdown stops the active lnd process and clean up any temporary directories +// shutdown stops the active lnd process and cleans up any temporary directories // created along the way. -func (hn *HarnessNode) Shutdown() error { +func (hn *HarnessNode) shutdown() error { if err := hn.stop(); err != nil { return err } From 96bc887ad2442f76b7c4dc62bbe7d9684718bca7 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 3 Nov 2017 15:21:35 -0700 Subject: [PATCH 6/7] lntest: Improve HarnessNode stop logic and remove restart(). --- lntest/harness.go | 12 ++++++++- lntest/node.go | 64 ++++++++++++++++------------------------------- 2 files changed, 32 insertions(+), 44 deletions(-) diff --git a/lntest/harness.go b/lntest/harness.go index f681b70ebfa..bbbc951c34f 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -338,7 +338,17 @@ func (n *NetworkHarness) DisconnectNodes(ctx context.Context, a, b *HarnessNode) // and invalidated prior state, or persistent state recovery, simulating node // crashes, etc. func (n *NetworkHarness) RestartNode(node *HarnessNode, callback func() error) error { - return node.restart(n.lndErrorChan, callback) + if err := node.stop(); err != nil { + return err + } + + if callback != nil { + if err := callback(); err != nil { + return err + } + } + + return node.start(n.lndErrorChan) } // ShutdownNode stops an active lnd process and returns when the process has diff --git a/lntest/node.go b/lntest/node.go index 85422315e5a..9242835cda3 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -204,8 +204,6 @@ func newNode(cfg nodeConfig) (*HarnessNode, error) { cfg: &cfg, NodeID: nodeNum, chanWatchRequests: make(chan *chanWatchRequest), - processExit: make(chan struct{}), - quit: make(chan struct{}), }, nil } @@ -217,7 +215,12 @@ func (hn *HarnessNode) DBPath() string { // Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. +// +// This may not clean up properly if an error is returned, so the caller should +// call shutdown() regardless of the return value. func (hn *HarnessNode) start(lndError chan<- error) error { + hn.quit = make(chan struct{}) + args := hn.cfg.genArgs() hn.cmd = exec.Command("lnd", args...) @@ -251,6 +254,7 @@ func (hn *HarnessNode) start(lndError chan<- error) error { // Launch a new goroutine which that bubbles up any potential fatal // process errors to the goroutine running the tests. + hn.processExit = make(chan struct{}) go func() { err := hn.cmd.Wait() @@ -363,56 +367,30 @@ func (hn *HarnessNode) cleanup() error { // Stop attempts to stop the active lnd process. func (hn *HarnessNode) stop() error { - // Do nothing if the process never started successfully. - if hn.LightningClient == nil { + // Do nothing if the process is not running. + if hn.processExit == nil { return nil } - // Do nothing if the process already finished. - select { - case <-hn.quit: - return nil - case <-hn.processExit: - return nil - default: + // If start() failed before creating a client, we will just wait for the + // child process to die. + if hn.LightningClient != nil { + // Don't watch for error because sometimes the RPC connection gets + // closed before a response is returned. + req := lnrpc.StopRequest{} + ctx := context.Background() + hn.LightningClient.StopDaemon(ctx, &req) } - // Don't watch for error because sometimes the RPC connection gets - // closed before a response is returned. - req := lnrpc.StopRequest{} - ctx := context.Background() - hn.LightningClient.StopDaemon(ctx, &req) - + // Wait for lnd process and other goroutines to exit. + <-hn.processExit close(hn.quit) hn.wg.Wait() - return nil -} - -// Restart attempts to restart a lightning node by shutting it down cleanly, -// then restarting the process. This function is fully blocking. Upon restart, -// the RPC connection to the node will be re-attempted, continuing iff the -// connection attempt is successful. Additionally, if a callback is passed, the -// closure will be executed after the node has been shutdown, but before the -// process has been started up again. -func (hn *HarnessNode) restart(errChan chan error, callback func() error) error { - if err := hn.stop(); err != nil { - return err - } - - <-hn.processExit + hn.quit = nil + hn.processExit = nil hn.LightningClient = nil - hn.processExit = make(chan struct{}) - hn.quit = make(chan struct{}) - hn.wg = sync.WaitGroup{} - - if callback != nil { - if err := callback(); err != nil { - return err - } - } - - return hn.start(errChan) + return nil } // shutdown stops the active lnd process and cleans up any temporary directories From 20ee9d896a307bfdac5490e5e49f2319e741f942 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Thu, 16 Nov 2017 17:31:41 -0800 Subject: [PATCH 7/7] lntest: Refactor set up calls to NetworkHarness. --- lnd_test.go | 57 +++++++++++++------------------- lntest/harness.go | 84 ++++++++++++++++------------------------------- 2 files changed, 51 insertions(+), 90 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 760c38a40d1..e618cf75082 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -4477,29 +4477,33 @@ var testsCases = []*testCase{ func TestLightningNetworkDaemon(t *testing.T) { ht := newHarnessTest(t) + var lndHarness *lntest.NetworkHarness + + // First create an instance of the btcd's rpctest.Harness. This will be + // used to fund the wallets of the nodes within the test network and to + // drive blockchain related events within the network. Revert the default + // setting of accepting non-standard transactions on simnet to reject them. + // Transactions on the lightning network should always be standard to get + // better guarantees of getting included in to blocks. + args := []string{"--rejectnonstd"} + handlers := &rpcclient.NotificationHandlers{ + OnTxAccepted: func(hash *chainhash.Hash, amt btcutil.Amount) { + lndHarness.OnTxAccepted(hash) + }, + } + btcdHarness, err := rpctest.New(harnessNetParams, handlers, args) + if err != nil { + ht.Fatalf("unable to create mining node: %v", err) + } + defer btcdHarness.TearDown() + // First create the network harness to gain access to its // 'OnTxAccepted' call back. - lndHarness, err := lntest.NewNetworkHarness() + lndHarness, err = lntest.NewNetworkHarness(btcdHarness) if err != nil { ht.Fatalf("unable to create lightning network harness: %v", err) } - - // Set up teardowns. While it's easier to set up the lnd harness before - // the btcd harness, they should be torn down in reverse order to - // prevent certain types of hangs. - var btcdHarness *rpctest.Harness - defer func() { - if lndHarness != nil { - lndHarness.TearDownAll() - } - if btcdHarness != nil { - btcdHarness.TearDown() - } - }() - - handlers := &rpcclient.NotificationHandlers{ - OnTxAccepted: lndHarness.OnTxAccepted, - } + defer lndHarness.TearDownAll() // Spawn a new goroutine to watch for any fatal errors that any of the // running lnd processes encounter. If an error occurs, then the test @@ -4517,18 +4521,6 @@ func TestLightningNetworkDaemon(t *testing.T) { } }() - // First create an instance of the btcd's rpctest.Harness. This will be - // used to fund the wallets of the nodes within the test network and to - // drive blockchain related events within the network. Revert the default - // setting of accepting non-standard transactions on simnet to reject them. - // Transactions on the lightning network should always be standard to get - // better guarantees of getting included in to blocks. - args := []string{"--rejectnonstd"} - btcdHarness, err = rpctest.New(harnessNetParams, handlers, args) - if err != nil { - ht.Fatalf("unable to create mining node: %v", err) - } - // Turn off the btcd rpc logging, otherwise it will lead to panic. // TODO(andrew.shvv|roasbeef) Remove the hack after re-work the way the log // rotator os work. @@ -4552,10 +4544,7 @@ func TestLightningNetworkDaemon(t *testing.T) { // initialization of the network. args - list of lnd arguments, // example: "--debuglevel=debug" // TODO(roasbeef): create master balanced channel with all the monies? - if err := lndHarness.InitializeSeedNodes(btcdHarness, nil); err != nil { - ht.Fatalf("unable to initialize seed nodes: %v", err) - } - if err = lndHarness.SetUp(); err != nil { + if err = lndHarness.SetUp(nil); err != nil { ht.Fatalf("unable to set up test lightning network: %v", err) } diff --git a/lntest/harness.go b/lntest/harness.go index bbbc951c34f..2631e9fbde5 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -37,7 +37,7 @@ type NetworkHarness struct { Alice *HarnessNode Bob *HarnessNode - seenTxns chan chainhash.Hash + seenTxns chan *chainhash.Hash bitcoinWatchRequests chan *txWatchRequest // Channel for transmitting stderr output from failed lightning node @@ -46,51 +46,26 @@ type NetworkHarness struct { quit chan struct{} - sync.Mutex + mtx sync.Mutex } // NewNetworkHarness creates a new network test harness. // TODO(roasbeef): add option to use golang's build library to a binary of the // current repo. This'll save developers from having to manually `go install` // within the repo each time before changes -func NewNetworkHarness() (*NetworkHarness, error) { - return &NetworkHarness{ +func NewNetworkHarness(r *rpctest.Harness) (*NetworkHarness, error) { + n := NetworkHarness{ activeNodes: make(map[int]*HarnessNode), - seenTxns: make(chan chainhash.Hash), + seenTxns: make(chan *chainhash.Hash), bitcoinWatchRequests: make(chan *txWatchRequest), lndErrorChan: make(chan error), + netParams: r.ActiveNet, + Miner: r, + rpcConfig: r.RPCConfig(), quit: make(chan struct{}), - }, nil -} - -// InitializeSeedNodes initializes alice and bob nodes given an already -// running instance of btcd's rpctest harness and extra command line flags, -// which should be formatted properly - "--arg=value". -func (n *NetworkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { - n.netParams = r.ActiveNet - n.Miner = r - n.rpcConfig = r.RPCConfig() - - config := nodeConfig{ - RPCConfig: &n.rpcConfig, - NetParams: n.netParams, - ExtraArgs: lndArgs, - } - - var err error - n.Alice, err = newNode(config) - if err != nil { - return err } - n.Bob, err = newNode(config) - if err != nil { - return err - } - - n.activeNodes[n.Alice.NodeID] = n.Alice - n.activeNodes[n.Bob.NodeID] = n.Bob - - return err + go n.networkWatcher() + return &n, nil } // ProcessErrors returns a channel used for reporting any fatal process errors. @@ -114,8 +89,9 @@ func (f *fakeLogger) Println(args ...interface{}) {} // SetUp starts the initial seeder nodes within the test harness. The initial // node's wallets will be funded wallets with ten 1 BTC outputs each. Finally // rpc clients capable of communicating with the initial seeder nodes are -// created. -func (n *NetworkHarness) SetUp() error { +// created. Nodes are initialized with the given extra command line flags, which +// should be formatted properly - "--arg=value". +func (n *NetworkHarness) SetUp(lndArgs []string) error { // Swap out grpc's default logger with out fake logger which drops the // statements on the floor. grpclog.SetLogger(&fakeLogger{}) @@ -127,15 +103,21 @@ func (n *NetworkHarness) SetUp() error { wg.Add(2) go func() { defer wg.Done() - if err := n.Alice.start(n.lndErrorChan); err != nil { + node, err := n.NewNode(lndArgs) + if err != nil { errChan <- err + return } + n.Alice = node }() go func() { defer wg.Done() - if err := n.Bob.start(n.lndErrorChan); err != nil { + node, err := n.NewNode(lndArgs) + if err != nil { errChan <- err + return } + n.Bob = node }() wg.Wait() select { @@ -214,10 +196,6 @@ out: } } - // Now that the initial test network has been initialized, launch the - // network watcher. - go n.networkWatcher() - return nil } @@ -239,9 +217,6 @@ func (n *NetworkHarness) TearDownAll() error { // current instance of the network harness. The created node is running, but // not yet connected to other nodes within the network. func (n *NetworkHarness) NewNode(extraArgs []string) (*HarnessNode, error) { - n.Lock() - defer n.Unlock() - node, err := newNode(nodeConfig{ RPCConfig: &n.rpcConfig, NetParams: n.netParams, @@ -253,7 +228,9 @@ func (n *NetworkHarness) NewNode(extraArgs []string) (*HarnessNode, error) { // Put node in activeNodes to ensure Shutdown is called even if Start // returns an error. + n.mtx.Lock() n.activeNodes[node.NodeID] = node + n.mtx.Unlock() if err := node.start(n.lndErrorChan); err != nil { return nil, err @@ -403,11 +380,11 @@ func (n *NetworkHarness) networkWatcher() { // we're able to dispatch any notifications for this // txid which arrive *after* it's seen within the // network. - seenTxns[txid] = struct{}{} + seenTxns[*txid] = struct{}{} // If there isn't a registered notification for this // transaction then ignore it. - txClients, ok := clients[txid] + txClients, ok := clients[*txid] if !ok { continue } @@ -417,24 +394,19 @@ func (n *NetworkHarness) networkWatcher() { for _, client := range txClients { close(client) } - delete(clients, txid) + delete(clients, *txid) } } } // OnTxAccepted is a callback to be called each time a new transaction has been // broadcast on the network. -func (n *NetworkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) { - // Return immediately if harness has been torn down. +func (n *NetworkHarness) OnTxAccepted(hash *chainhash.Hash) { select { + case n.seenTxns <- hash: case <-n.quit: return - default: } - - go func() { - n.seenTxns <- *hash - }() } // WaitForTxBroadcast blocks until the target txid is seen on the network. If