Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions autopilot/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,17 @@ type chanOpenUpdate struct {
newChan Channel
}

// chanOpenFailureUpdate is a type of external state update that indicates
// a previous channel open failed, and that it might be possible to try again.
type chanOpenFailureUpdate struct{}

// chanCloseUpdate is a type of external state update that indicates that the
// backing Lightning Node has closed a previously open channel.
type chanCloseUpdate struct {
closedChans []lnwire.ShortChannelID
}

// OnBalanceChange is a callback that should be executed each the balance of
// OnBalanceChange is a callback that should be executed each time the balance of
// the backing wallet changes.
func (a *Agent) OnBalanceChange(delta btcutil.Amount) {
go func() {
Expand All @@ -203,6 +207,15 @@ func (a *Agent) OnChannelOpen(c Channel) {
}()
}

// OnChannelOpenFailure is a callback that should be executed when the
// autopilot has attempted to open a channel, but failed. In this case we can
// retry channel creation with a different node.
func (a *Agent) OnChannelOpenFailure() {
go func() {
a.stateUpdates <- &chanOpenFailureUpdate{}
}()
}

// OnChannelClose is a callback that should be executed each time a prior
// channel has been closed for any reason. This includes regular
// closes, force closes, and channel breaches.
Expand All @@ -218,18 +231,21 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
// channels open to, with the set of nodes that are pending new channels. This
// ensures that the Agent doesn't attempt to open any "duplicate" channels to
// the same node.
func mergeNodeMaps(a map[NodeID]struct{},
b map[NodeID]Channel) map[NodeID]struct{} {
func mergeNodeMaps(a map[NodeID]struct{}, b map[NodeID]struct{},
c map[NodeID]Channel) map[NodeID]struct{} {

c := make(map[NodeID]struct{}, len(a)+len(b))
res := make(map[NodeID]struct{}, len(a)+len(b)+len(c))
for nodeID := range a {
c[nodeID] = struct{}{}
res[nodeID] = struct{}{}
}
for nodeID := range b {
c[nodeID] = struct{}{}
res[nodeID] = struct{}{}
}
for nodeID := range c {
res[nodeID] = struct{}{}
}

return c
return res
}

// mergeChanState merges the Agent's set of active channels, with the set of
Expand Down Expand Up @@ -267,6 +283,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so

// failedNodes lists nodes that we've previously attempted to initiate
// channels with, but didn't succeed.
failedNodes := make(map[NodeID]struct{})

// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
Expand Down Expand Up @@ -294,6 +314,11 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {

a.totalBalance += update.balanceDelta

// The channel we tried to open previously failed for
// whatever reason.
case *chanOpenFailureUpdate:
log.Debug("Retrying after previous channel open failure.")

// A new channel has been opened successfully. This was
// either opened by the Agent, or an external system
// that is able to drive the Lightning Node.
Expand Down Expand Up @@ -351,7 +376,7 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// we avoid duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(connectedNodes, pendingOpens)
nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens)
pendingMtx.Unlock()

// If we reach this point, then according to our
Expand Down Expand Up @@ -410,8 +435,16 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
pendingMtx.Lock()
nID := NewNodeID(directive.PeerKey)
delete(pendingOpens, nID)

// Mark this node as failed so we don't
// attempt it again.
failedNodes[nID] = struct{}{}
pendingMtx.Unlock()

// Trigger the autopilot controller to
// re-evaluate everything and possibly
// retry with a different node.
a.OnChannelOpenFailure()
}

}(chanCandidate)
Expand Down
115 changes: 115 additions & 0 deletions autopilot/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"errors"
"fmt"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
Expand Down Expand Up @@ -218,6 +220,119 @@ func TestAgentChannelOpenSignal(t *testing.T) {
}
}

// A mockFailingChanController always fails to open a channel.
type mockFailingChanController struct {
}

func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount,
addrs []net.Addr) error {
return errors.New("failure")
}

func (m *mockFailingChanController) CloseChannel(chanPoint *wire.OutPoint) error {
return nil
}
func (m *mockFailingChanController) SpliceIn(chanPoint *wire.OutPoint,
amt btcutil.Amount) (*Channel, error) {
return nil, nil
}
func (m *mockFailingChanController) SpliceOut(chanPoint *wire.OutPoint,
amt btcutil.Amount) (*Channel, error) {
return nil, nil
}

var _ ChannelController = (*mockFailingChanController)(nil)

// TestAgentChannelFailureSignal tests that if an autopilot channel fails to
// open, the agent is signalled to make a new decision.
func TestAgentChannelFailureSignal(t *testing.T) {
t.Parallel()

// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockFailingChanController{}
memGraph, _, _ := newMemChanGraph()

// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return 0, nil
},
Graph: memGraph,
}

initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}

// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()

// First ensure the agent will attempt to open a new channel. Return
// that we need more channels, and have 5BTC to use.
select {
case heuristic.moreChansResps <- moreChansResp{true, 5 * btcutil.SatoshiPerBitcoin}:
fmt.Println("Returning 5BTC from heuristic")
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}

// At this point, the agent should now be querying the heuristic to
// request attachment directives, return a fake so the agent will attempt
// to open a channel.
var fakeDirective = AttachmentDirective{
PeerKey: self,
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
}

select {
case heuristic.directiveResps <- []AttachmentDirective{fakeDirective}:
fmt.Println("Returning a node to connect to from heuristic")
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}

// At this point the agent will attempt to create a channel and fail.

// Now ensure that the controller loop is re-executed.
select {
case heuristic.moreChansResps <- moreChansResp{true, 5 * btcutil.SatoshiPerBitcoin}:
fmt.Println("Returning need more channels from heuristic")
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}

select {
case heuristic.directiveResps <- []AttachmentDirective{}:
fmt.Println("Returning an empty directives list")
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
}

// TestAgentChannelCloseSignal ensures that once the agent receives an outside
// signal of a channel belonging to the backing LN node being closed, then it
// will query the heuristic to make its next decision.
Expand Down