Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing: cancelable payment loop #8734

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/lncli/cmd_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ var (
Usage: "(blinded paths) the total cltv delay for the " +
"blinded portion of the route",
}

cancelableFlag = cli.BoolFlag{
Name: "cancelable",
Usage: "if set to true, the payment loop can be interrupted " +
"by manually canceling the payment context, even " +
"before the payment timeout is reached. Note that " +
"the payment may still succeed after cancellation, " +
"as in-flight attempts can still settle afterwards. " +
"Canceling will only prevent further attempts from " +
"being sent",
}
)

// paymentFlags returns common flags for sendpayment and payinvoice.
Expand Down Expand Up @@ -166,6 +177,7 @@ func paymentFlags() []cli.Flag {
"after the timeout has elapsed",
Value: paymentTimeout,
},
cancelableFlag,
cltvLimitFlag,
lastHopFlag,
cli.Int64SliceFlag{
Expand Down Expand Up @@ -328,6 +340,7 @@ func sendPayment(ctx *cli.Context) error {
PaymentRequest: stripPrefix(ctx.String("pay_req")),
Amt: ctx.Int64("amt"),
DestCustomRecords: make(map[uint64][]byte),
Cancelable: ctx.Bool(cancelableFlag.Name),
}

// We'll attempt to parse a payment address as well, given that
Expand Down Expand Up @@ -386,6 +399,7 @@ func sendPayment(ctx *cli.Context) error {
Amt: amount,
DestCustomRecords: make(map[uint64][]byte),
Amp: ctx.Bool(ampFlag.Name),
Cancelable: ctx.Bool(cancelableFlag.Name),
}

var rHash []byte
Expand Down Expand Up @@ -887,6 +901,7 @@ func payInvoice(ctx *cli.Context) error {
Amt: ctx.Int64("amt"),
DestCustomRecords: make(map[uint64][]byte),
Amp: ctx.Bool(ampFlag.Name),
Cancelable: ctx.Bool(cancelableFlag.Name),
}

return sendPaymentRequest(ctx, req)
Expand Down
19 changes: 17 additions & 2 deletions lnrpc/routerrpc/router.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions lnrpc/routerrpc/router.proto
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,15 @@ message SendPaymentRequest {
only, to 1 to optimize for reliability only or a value inbetween for a mix.
*/
double time_pref = 23;

/*
If set, the payment loop can be interrupted by manually canceling the
payment context, even before the payment timeout is reached. Note that the
payment may still succeed after cancellation, as in-flight attempts can
still settle afterwards. Canceling will only prevent further attempts from
being sent.
*/
bool cancelable = 24;
}

message TrackPaymentRequest {
Expand Down
4 changes: 4 additions & 0 deletions lnrpc/routerrpc/router.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,10 @@
"type": "number",
"format": "double",
"description": "The time preference for this payment. Set to -1 to optimize for fees\nonly, to 1 to optimize for reliability only or a value inbetween for a mix."
},
"cancelable": {
"type": "boolean",
"description": "If set, the payment loop can be interrupted by manually canceling the\npayment context, even before the payment timeout is reached. Note that the\npayment may still succeed after cancellation, as in-flight attempts can\nstill settle afterwards. Canceling will only prevent further attempts from\nbeing sent."
}
}
},
Expand Down
60 changes: 42 additions & 18 deletions lnrpc/routerrpc/router_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ var (
DefaultRouterMacFilename = "router.macaroon"
)

// ServerShell a is shell struct holding a reference to the actual sub-server.
// ServerShell is a shell struct holding a reference to the actual sub-server.
// It is used to register the gRPC sub-server with the root server before we
// have the necessary dependencies to populate the actual sub-server.
type ServerShell struct {
RouterServer
}

// Server is a stand alone sub RPC server which exposes functionality that
// Server is a stand-alone sub RPC server which exposes functionality that
// allows clients to route arbitrary payment through the Lightning Network.
type Server struct {
started int32 // To be used atomically.
Expand All @@ -181,7 +181,7 @@ var _ RouterServer = (*Server)(nil)
// that contains all external dependencies. If the target macaroon exists, and
// we're unable to create it, then an error will be returned. We also return
// the set of permissions that we require as a server. At the time of writing
// of this documentation, this is the same macaroon as as the admin macaroon.
// of this documentation, this is the same macaroon as the admin macaroon.
func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) {
// If the path of the router macaroon wasn't generated, then we'll
// assume that it's found at the default network directory.
Expand Down Expand Up @@ -360,13 +360,37 @@ func (s *Server) SendPaymentV2(req *SendPaymentRequest,
return err
}

// The payment context is influenced by two user-provided parameters,
// the cancelable flag and the payment attempt timeout.
// If the payment is cancelable, we will use the stream context as the
// payment context. That way, if the user ends the stream, the payment
// loop will be canceled.
ctx := context.Background()
if req.Cancelable {
ctx = stream.Context()
}

// The second context parameter is the timeout. If the user provides a
// timeout, we will additionally wrap the context in a deadline. If the
// user provided 'cancelable' and ends the stream before the timeout is
// reached the payment will be canceled.
cancel := func() {}
if payment.PayAttemptTimeout > 0 {
timeout := time.Now().Add(payment.PayAttemptTimeout)
ctx, cancel = context.WithDeadline(ctx, timeout)
}

// Send the payment asynchronously.
s.cfg.Router.SendPaymentAsync(payment, paySession, shardTracker)
s.cfg.Router.SendPaymentAsync(ctx, payment, paySession, shardTracker)

// Track the payment and return.
return s.trackPayment(
sub, payHash, stream, req.NoInflightUpdates,
)
err = s.trackPayment(sub, payHash, stream, req.NoInflightUpdates)

// Cancel the timeout context. If the context already timed out or if
// there was no timeout provided, this will be a no-op.
cancel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't cancel the payment directly because trackPayment is blocking?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is to avoid a linter issue that would arise if I wrote above ctx, _ = context.WithDeadline(ctx, timeout). And if I assign the cancel I have to call it. The right place seems to be after trackPayment because then the context can be canceled.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an alternative could be to place it after sendPayment in SendPaymentAsync, but not sure that's a good pattern

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also silence the linter here with a comment as to why we don't need the context in this case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another opinion here would be appreciated (if the context cancelation may be done within the SendPaymentAsync goroutine as a defer statement).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @yyforyongyu could opine here as this concerns his suggestion to remove the timeout channel in favor of this context?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call cancel here I think it will immediately invoke <-ctx.Done in checkContext since it's async sending. I think we can instead decide the right context to use here (stream.Context or background ctx), pass it to SendPaymentAsync -> sendPayment, and inside sendPayment we create the timeout ctx, and call the defer cancel() there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout thing may also be moved to the payment session, but that's a bigger refactor so future PRs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call cancel here I think it will immediately invoke <-ctx.Done in checkContext since it's async sending.

Before cancel() is called there is the s.trackPayment call which is blocking. So I think cancel() is only called once the track payment call ends which implies that also the async payment loop ended. I think the cancel would then be correct here. We could add clarifying docs here.

Could also create the timeoutCtx in sendPayment. instead.


return err
}

// EstimateRouteFee allows callers to obtain an expected value w.r.t how much it
Expand Down Expand Up @@ -986,9 +1010,8 @@ func (s *Server) SetMissionControlConfig(ctx context.Context,
AprioriHopProbability: float64(
req.Config.HopProbability,
),
AprioriWeight: float64(req.Config.Weight),
CapacityFraction: float64(
routing.DefaultCapacityFraction),
AprioriWeight: float64(req.Config.Weight),
CapacityFraction: routing.DefaultCapacityFraction, //nolint:lll
hieblmi marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -1032,8 +1055,8 @@ func (s *Server) SetMissionControlConfig(ctx context.Context,

// QueryMissionControl exposes the internal mission control state to callers. It
// is a development feature.
func (s *Server) QueryMissionControl(ctx context.Context,
req *QueryMissionControlRequest) (*QueryMissionControlResponse, error) {
func (s *Server) QueryMissionControl(_ context.Context,
_ *QueryMissionControlRequest) (*QueryMissionControlResponse, error) {

snapshot := s.cfg.RouterBackend.MissionControl.GetHistorySnapshot()

Expand Down Expand Up @@ -1080,7 +1103,7 @@ func toRPCPairData(data *routing.TimedPairResult) *PairData {

// XImportMissionControl imports the state provided to our internal mission
// control. Only entries that are fresher than our existing state will be used.
func (s *Server) XImportMissionControl(ctx context.Context,
func (s *Server) XImportMissionControl(_ context.Context,
req *XImportMissionControlRequest) (*XImportMissionControlResponse,
error) {

Expand Down Expand Up @@ -1264,8 +1287,9 @@ func (s *Server) subscribePayment(identifier lntypes.Hash) (
sub, err := router.Tower.SubscribePayment(identifier)

switch {
case err == channeldb.ErrPaymentNotInitiated:
case errors.Is(err, channeldb.ErrPaymentNotInitiated):
return nil, status.Error(codes.NotFound, err.Error())

case err != nil:
return nil, err
}
Expand Down Expand Up @@ -1376,7 +1400,7 @@ func (s *Server) trackPaymentStream(context context.Context,
}

// BuildRoute builds a route from a list of hop addresses.
func (s *Server) BuildRoute(ctx context.Context,
func (s *Server) BuildRoute(_ context.Context,
req *BuildRouteRequest) (*BuildRouteResponse, error) {

// Unmarshall hop list.
Expand Down Expand Up @@ -1437,7 +1461,7 @@ func (s *Server) BuildRoute(ctx context.Context,

// SubscribeHtlcEvents creates a uni-directional stream from the server to
// the client which delivers a stream of htlc events.
func (s *Server) SubscribeHtlcEvents(req *SubscribeHtlcEventsRequest,
func (s *Server) SubscribeHtlcEvents(_ *SubscribeHtlcEventsRequest,
stream Router_SubscribeHtlcEventsServer) error {

htlcClient, err := s.cfg.RouterBackend.SubscribeHtlcEvents()
Expand Down Expand Up @@ -1486,7 +1510,7 @@ func (s *Server) SubscribeHtlcEvents(req *SubscribeHtlcEventsRequest,

// HtlcInterceptor is a bidirectional stream for streaming interception
// requests to the caller.
// Upon connection it does the following:
// Upon connection, it does the following:
// 1. Check if there is already a live stream, if yes it rejects the request.
// 2. Registered a ForwardInterceptor
// 3. Delivers to the caller every √√ and detect his answer.
Expand Down Expand Up @@ -1516,7 +1540,7 @@ func extractOutPoint(req *UpdateChanStatusRequest) (*wire.OutPoint, error) {
}

// UpdateChanStatus allows channel state to be set manually.
func (s *Server) UpdateChanStatus(ctx context.Context,
func (s *Server) UpdateChanStatus(_ context.Context,
req *UpdateChanStatusRequest) (*UpdateChanStatusResponse, error) {

outPoint, err := extractOutPoint(req)
Expand Down