Skip to content

Commit

Permalink
routing: process successes in mission control
Browse files Browse the repository at this point in the history
This commit modifies paymentLifecycle so that it not only feeds
failures into mission control, but successes as well.
This allows for more accurate probability estimates. Previously,
the success probability for a successful pair and a pair with
no history was equal. There was no force that pushed towards
previously successful routes.
  • Loading branch information
joostjager committed Aug 21, 2019
1 parent 153f1fe commit 88debb9
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 187 deletions.
20 changes: 11 additions & 9 deletions cmd/lncli/cmd_query_mission_control.go
Expand Up @@ -38,10 +38,11 @@ func queryMissionControl(ctx *cli.Context) error {
}

type displayPairHistory struct {
NodeFrom, NodeTo string
LastFailTime int64
SuccessProb float32
MinPenalizeAmtSat int64
NodeFrom, NodeTo string
LastAttemptSuccessful bool
Timestamp int64
SuccessProb float32
MinPenalizeAmtSat int64
}

displayResp := struct {
Expand All @@ -64,11 +65,12 @@ func queryMissionControl(ctx *cli.Context) error {
displayResp.Pairs = append(
displayResp.Pairs,
displayPairHistory{
NodeFrom: hex.EncodeToString(n.NodeFrom),
NodeTo: hex.EncodeToString(n.NodeTo),
LastFailTime: n.LastFailTime,
SuccessProb: n.SuccessProb,
MinPenalizeAmtSat: n.MinPenalizeAmtSat,
NodeFrom: hex.EncodeToString(n.NodeFrom),
NodeTo: hex.EncodeToString(n.NodeTo),
LastAttemptSuccessful: n.LastAttemptSuccessful,
Timestamp: n.Timestamp,
SuccessProb: n.SuccessProb,
MinPenalizeAmtSat: n.MinPenalizeAmtSat,
},
)
}
Expand Down
241 changes: 126 additions & 115 deletions lnrpc/routerrpc/router.pb.go

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions lnrpc/routerrpc/router.proto
Expand Up @@ -357,14 +357,17 @@ message PairHistory {
/// The destination node pubkey of the pair.
bytes node_to = 2 [json_name="node_to"];

/// Time stamp of last failure.
int64 last_fail_time = 3 [json_name = "last_fail_time"];
/// Time stamp of last result.
int64 timestamp = 3 [json_name = "timestamp"];

/// Minimum penalization amount.
/// Minimum penalization amount (only applies to failed attempts).
int64 min_penalize_amt_sat = 4 [json_name = "min_penalize_amt_sat"];

/// Estimation of success probability for this pair.
float success_prob = 5 [json_name = "success_prob"];

/// Whether the last payment attempt through this pair was successful.
bool last_attempt_successful = 6 [json_name = "last_attempt_successful"];
}

service Router {
Expand Down
9 changes: 5 additions & 4 deletions lnrpc/routerrpc/router_server.go
Expand Up @@ -473,13 +473,14 @@ func (s *Server) QueryMissionControl(ctx context.Context,
pair := p

rpcPair := PairHistory{
NodeFrom: pair.Pair.From[:],
NodeTo: pair.Pair.To[:],
LastFailTime: pair.LastFail.Unix(),
NodeFrom: pair.Pair.From[:],
NodeTo: pair.Pair.To[:],
Timestamp: pair.Timestamp.Unix(),
MinPenalizeAmtSat: int64(
pair.MinPenalizeAmt.ToSatoshis(),
),
SuccessProb: float32(pair.SuccessProb),
SuccessProb: float32(pair.SuccessProb),
LastAttemptSuccessful: pair.LastAttemptSuccessful,
}

rpcPairs = append(rpcPairs, &rpcPair)
Expand Down
100 changes: 72 additions & 28 deletions routing/missioncontrol.go
Expand Up @@ -43,6 +43,10 @@ const (

// DefaultMaxMcHistory is the default maximum history size.
DefaultMaxMcHistory = 1000

// prevSuccessProbability is the assumed probability for node pairs that
// successfully relayed the previous attempt.
prevSuccessProbability = 0.95
)

// MissionControl contains state which summarizes the past attempts of HTLC
Expand All @@ -55,8 +59,8 @@ const (
// since the last failure is used to estimate a success probability that is fed
// into the path finding process for subsequent payment attempts.
type MissionControl struct {
// lastPairFailure tracks the last payment failure per node pair.
lastPairFailure map[DirectedNodePair]pairFailure
// lastPairResult tracks the last payment result per node pair.
lastPairResult map[DirectedNodePair]timedPairResult

// lastNodeFailure tracks the last node level failure per node.
lastNodeFailure map[route.Vertex]time.Time
Expand Down Expand Up @@ -97,14 +101,12 @@ type MissionControlConfig struct {
MaxMcHistory int
}

// pairFailure describes a payment failure for a node pair.
type pairFailure struct {
// timestamp is the time when this failure result was obtained.
// timedPairResult describes a timestamped pair result.
type timedPairResult struct {
// timestamp is the time when this result was obtained.
timestamp time.Time

// minPenalizeAmt is the minimum amount for which to take this failure
// into account.
minPenalizeAmt lnwire.MilliSatoshi
pairResult
}

// MissionControlSnapshot contains a snapshot of the current state of mission
Expand Down Expand Up @@ -138,15 +140,19 @@ type MissionControlPairSnapshot struct {
// Pair is the node pair of which the state is described.
Pair DirectedNodePair

// LastFail is the time of last failure.
LastFail time.Time
// Timestamp is the time of last result.
Timestamp time.Time

// MinPenalizeAmt is the minimum amount for which the channel will be
// penalized.
MinPenalizeAmt lnwire.MilliSatoshi

// SuccessProb is the success probability estimation for this channel.
SuccessProb float64

// LastAttemptSuccessful indicates whether the last payment attempt
// through this pair was successful.
LastAttemptSuccessful bool
}

// paymentResult is the information that becomes available when a payment
Expand Down Expand Up @@ -174,7 +180,7 @@ func NewMissionControl(db *bbolt.DB, cfg *MissionControlConfig) (
}

mc := &MissionControl{
lastPairFailure: make(map[DirectedNodePair]pairFailure),
lastPairResult: make(map[DirectedNodePair]timedPairResult),
lastNodeFailure: make(map[route.Vertex]time.Time),
lastSecondChance: make(map[DirectedNodePair]time.Time),
now: time.Now,
Expand Down Expand Up @@ -220,7 +226,7 @@ func (m *MissionControl) ResetHistory() error {
return err
}

m.lastPairFailure = make(map[DirectedNodePair]pairFailure)
m.lastPairResult = make(map[DirectedNodePair]timedPairResult)
m.lastNodeFailure = make(map[route.Vertex]time.Time)
m.lastSecondChance = make(map[DirectedNodePair]time.Time)

Expand Down Expand Up @@ -271,12 +277,16 @@ func (m *MissionControl) getPairProbability(fromNode,

// Retrieve the last pair outcome.
pair := NewDirectedNodePair(fromNode, toNode)
lastPairResult, ok := m.lastPairFailure[pair]
lastPairResult, ok := m.lastPairResult[pair]

// Only look at the last pair outcome if it happened after the last node
// level failure. Otherwise the node level failure is the most recent
// and used as the basis for calculation of the probability.
if ok && lastPairResult.timestamp.After(lastFail) {
if lastPairResult.success {
return prevSuccessProbability
}

// Take into account a minimum penalize amount. For balance
// errors, a failure may be reported with such a minimum to
// prevent too aggresive penalization. We only take into account
Expand Down Expand Up @@ -330,7 +340,7 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {

log.Debugf("Requesting history snapshot from mission control: "+
"node_failure_count=%v, pair_result_count=%v",
len(m.lastNodeFailure), len(m.lastPairFailure))
len(m.lastNodeFailure), len(m.lastPairResult))

nodes := make([]MissionControlNodeSnapshot, 0, len(m.lastNodeFailure))
for v, h := range m.lastNodeFailure {
Expand All @@ -343,18 +353,19 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
})
}

pairs := make([]MissionControlPairSnapshot, 0, len(m.lastPairFailure))
pairs := make([]MissionControlPairSnapshot, 0, len(m.lastPairResult))

for v, h := range m.lastPairFailure {
for v, h := range m.lastPairResult {
// Show probability assuming amount meets min
// penalization amount.
prob := m.getPairProbability(v.From, v.To, h.minPenalizeAmt)

pair := MissionControlPairSnapshot{
Pair: v,
MinPenalizeAmt: h.minPenalizeAmt,
LastFail: h.timestamp,
SuccessProb: prob,
Pair: v,
MinPenalizeAmt: h.minPenalizeAmt,
Timestamp: h.timestamp,
SuccessProb: prob,
LastAttemptSuccessful: h.success,
}

pairs = append(pairs, pair)
Expand All @@ -379,7 +390,6 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,

timestamp := m.now()

// TODO(joostjager): Use actual payment initiation time for timeFwd.
result := &paymentResult{
success: false,
timeFwd: timestamp,
Expand All @@ -390,6 +400,33 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
route: rt,
}

return m.processPaymentResult(result)
}

// ReportPaymentSuccess reports a successful payment to mission control as input
// for future probability estimates.
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
rt *route.Route) error {

timestamp := m.now()

result := &paymentResult{
timeFwd: timestamp,
timeReply: timestamp,
id: paymentID,
success: true,
route: rt,
}

_, err := m.processPaymentResult(result)
return err
}

// processPaymentResult stores a payment result in the mission control store and
// updates mission control's in-memory state.
func (m *MissionControl) processPaymentResult(result *paymentResult) (
*channeldb.FailureReason, error) {

// Store complete result in database.
if err := m.store.AddResult(result); err != nil {
return nil, err
Expand All @@ -409,7 +446,8 @@ func (m *MissionControl) applyPaymentResult(

// Interpret result.
i := interpretResult(
result.route, result.failureSourceIdx, result.failure,
result.route, result.success, result.failureSourceIdx,
result.failure,
)

// Update mission control state using the interpretation.
Expand All @@ -432,13 +470,19 @@ func (m *MissionControl) applyPaymentResult(
m.lastNodeFailure[*i.nodeFailure] = result.timeReply
}

for pair, minPenalizeAmt := range i.pairResults {
log.Debugf("Reporting pair failure to Mission Control: "+
"pair=%v, minPenalizeAmt=%v", pair, minPenalizeAmt)
for pair, pairResult := range i.pairResults {
if pairResult.success {
log.Debugf("Reporting pair success to Mission "+
"Control: pair=%v", pair)
} else {
log.Debugf("Reporting pair failure to Mission "+
"Control: pair=%v, minPenalizeAmt=%v",
pair, pairResult.minPenalizeAmt)
}

m.lastPairFailure[pair] = pairFailure{
minPenalizeAmt: minPenalizeAmt,
timestamp: result.timeReply,
m.lastPairResult[pair] = timedPairResult{
timestamp: result.timeReply,
pairResult: pairResult,
}
}

Expand Down
19 changes: 16 additions & 3 deletions routing/missioncontrol_test.go
Expand Up @@ -112,6 +112,16 @@ func (ctx *mcTestContext) reportFailure(amt lnwire.MilliSatoshi,
)
}

// reportSuccess reports a success by using a test route.
func (ctx *mcTestContext) reportSuccess() {
err := ctx.mc.ReportPaymentSuccess(ctx.pid, mcTestRoute)
if err != nil {
ctx.t.Fatal(err)
}

ctx.pid++
}

// TestMissionControl tests mission control probability estimation.
func TestMissionControl(t *testing.T) {
ctx := createMcTestContext(t)
Expand Down Expand Up @@ -158,12 +168,15 @@ func TestMissionControl(t *testing.T) {
// Check whether history snapshot looks sane.
history := ctx.mc.GetHistorySnapshot()
if len(history.Nodes) != 1 {
t.Fatal("unexpected number of nodes")
t.Fatalf("expected 1 node, but got %v", len(history.Nodes))
}

if len(history.Pairs) != 1 {
t.Fatal("unexpected number of channels")
if len(history.Pairs) != 2 {
t.Fatalf("expected 2 pairs, but got %v", len(history.Pairs))
}

// Test reporting a success.
ctx.reportSuccess()
}

// TestMissionControlChannelUpdate tests that the first channel update is not
Expand Down
6 changes: 6 additions & 0 deletions routing/mock_test.go
Expand Up @@ -105,6 +105,12 @@ func (m *mockMissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route
return nil, nil
}

func (m *mockMissionControl) ReportPaymentSuccess(paymentID uint64,
rt *route.Route) error {

return nil
}

func (m *mockMissionControl) GetProbability(fromNode, toNode route.Vertex,
amt lnwire.MilliSatoshi) float64 {

Expand Down
2 changes: 1 addition & 1 deletion routing/pathfind.go
Expand Up @@ -53,7 +53,7 @@ var (

// DefaultAprioriHopProbability is the default a priori probability for
// a hop.
DefaultAprioriHopProbability = float64(0.95)
DefaultAprioriHopProbability = float64(0.6)
)

// edgePolicyWithSource is a helper struct to keep track of the source node
Expand Down
9 changes: 9 additions & 0 deletions routing/payment_lifecycle.go
Expand Up @@ -161,6 +161,15 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
log.Debugf("Payment %x succeeded with pid=%v",
p.payment.PaymentHash, p.attempt.PaymentID)

// Report success to mission control.
err = p.router.cfg.MissionControl.ReportPaymentSuccess(
p.attempt.PaymentID, &p.attempt.Route,
)
if err != nil {
log.Errorf("Error reporting payment success to mc: %v",
err)
}

// In case of success we atomically store the db payment and
// move the payment to the success state.
err = p.router.cfg.Control.Success(p.payment.PaymentHash, result.Preimage)
Expand Down

0 comments on commit 88debb9

Please sign in to comment.