Skip to content

Commit

Permalink
htlcswitch/switch: permit link shutdown mid-forwarding
Browse files Browse the repository at this point in the history
In this commit, we thread through a link's quit channel into
routeAsync, the primary helper method allowing links to send
htlcPackets through the switch. This is intended to remove
deadlocks from happening, where the link is synchronously
blocking on forwarding packets to the switch, but also
needs to shutdown.
  • Loading branch information
cfromknecht committed Aug 10, 2018
1 parent 0fef1c7 commit f84cd14
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions htlcswitch/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,15 @@ func (s *Switch) forward(packet *htlcPacket) error {
// ForwardPackets adds a list of packets to the switch for processing. Fails
// and settles are added on a first past, simultaneously constructing circuits
// for any adds. After persisting the circuits, another pass of the adds is
// given to forward them through the router.
// given to forward them through the router. The sending link's quit channel is
// used to prevent deadlocks when the switch stops a link in the midst of
// forwarding.
//
// NOTE: This method guarantees that the returned err chan will eventually be
// closed. The receiver should read on the channel until receiving such a
// signal.
func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
func (s *Switch) ForwardPackets(linkQuit chan struct{},
packets ...*htlcPacket) chan error {

var (
// fwdChan is a buffered channel used to receive err msgs from
Expand Down Expand Up @@ -568,6 +571,9 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
// so, we exit early to avoid incrementing the switch's waitgroup while
// it is already in the process of shutting down.
select {
case <-linkQuit:
close(errChan)
return errChan
case <-s.quit:
close(errChan)
return errChan
Expand All @@ -593,7 +599,10 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
circuits = append(circuits, circuit)
addBatch = append(addBatch, packet)
default:
s.routeAsync(packet, fwdChan)
err := s.routeAsync(packet, fwdChan, linkQuit)
if err != nil {
return errChan
}
numSent++
}
}
Expand Down Expand Up @@ -635,7 +644,10 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
// Now, forward any packets for circuits that were successfully added to
// the switch's circuit map.
for _, packet := range addedPackets {
s.routeAsync(packet, fwdChan)
err := s.routeAsync(packet, fwdChan, linkQuit)
if err != nil {
return errChan
}
numSent++
}

Expand Down Expand Up @@ -722,9 +734,13 @@ func (s *Switch) route(packet *htlcPacket) error {
}

// routeAsync sends a packet through the htlc switch, using the provided err
// chan to propagate errors back to the caller. This method does not wait for
// a response before returning.
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error) error {
// chan to propagate errors back to the caller. The link's quit channel is
// provided so that the send can be canceled if either the link or the switch
// receive a shutdown requuest. This method does not wait for a response from
// the htlcForwarder before returning.
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
linkQuit chan struct{}) error {

command := &plexPacket{
pkt: packet,
err: errChan,
Expand All @@ -733,6 +749,8 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error) error {
select {
case s.htlcPlex <- command:
return nil
case <-linkQuit:
return ErrLinkShuttingDown
case <-s.quit:
return errors.New("Htlc Switch was stopped")
}
Expand Down Expand Up @@ -1734,7 +1752,10 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
}
}

errChan := s.ForwardPackets(switchPackets...)
// Since this send isn't tied to a specific link, we pass a nil
// link quit channel, meaning the send will fail only if the
// switch receives a shutdown request.
errChan := s.ForwardPackets(nil, switchPackets...)
go handleBatchFwdErrs(errChan)
}
}
Expand Down

0 comments on commit f84cd14

Please sign in to comment.