Skip to content

Commit

Permalink
connectd: fix forwarding after tx_abort.
Browse files Browse the repository at this point in the history
If we get a WIRE_TX_ABORT then another message, we send the other message to the same
subd (even though the tx abort causes it to shutdown).  This means we effectively
lose the next message, and timeout (see below from CI, reproduced locally).

So, have connectd ignore the subd after it forwards the WIRE_TX_ABORT.  The next
message will, correctly, cause a fresh subdaemon to be spawned.

```
    @unittest.skipIf(TEST_NETWORK != 'regtest', 'elementsd doesnt yet support PSBT features we need')
    @pytest.mark.openchannel('v2')
    def test_v2_rbf_multi(node_factory, bitcoind, chainparams):
        l1, l2 = node_factory.get_nodes(2,
                                        opts={'may_reconnect': True,
                                              'dev-no-reconnect': None,
                                              'allow_warning': True})
    
        l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
        amount = 2**24
        chan_amount = 100000
        bitcoind.rpc.sendtoaddress(l1.rpc.newaddr()['bech32'], amount / 10**8 + 0.01)
        bitcoind.generate_block(1)
        # Wait for it to arrive.
        wait_for(lambda: len(l1.rpc.listfunds()['outputs']) > 0)
    
        res = l1.rpc.fundchannel(l2.info['id'], chan_amount)
        chan_id = res['channel_id']
        vins = bitcoind.rpc.decoderawtransaction(res['tx'])['vin']
        assert(only_one(vins))
        prev_utxos = ["{}:{}".format(vins[0]['txid'], vins[0]['vout'])]
    
        # Check that we're waiting for lockin
        l1.daemon.wait_for_log(' to DUALOPEND_AWAITING_LOCKIN')
    
        # Attempt to do abort, should fail since we've
        # already gotten an inflight
        with pytest.raises(RpcError):
            l1.rpc.openchannel_abort(chan_id)
    
        rate = int(find_next_feerate(l1, l2)[:-5])
        # We 4x the feerate to beat the min-relay fee
        next_feerate = '{}perkw'.format(rate * 4)
    
        # Initiate an RBF
        startweight = 42 + 172  # base weight, funding output
        initpsbt = l1.rpc.utxopsbt(chan_amount, next_feerate, startweight,
                                   prev_utxos, reservedok=True,
                                   min_witness_weight=110,
                                   excess_as_change=True)
    
        # Do the bump
        bump = l1.rpc.openchannel_bump(chan_id, chan_amount,
                                       initpsbt['psbt'],
                                       funding_feerate=next_feerate)
    
        # Abort this open attempt! We will re-try
        aborted = l1.rpc.openchannel_abort(chan_id)
        assert not aborted['channel_canceled']
        # We no longer disconnect on aborts, because magic!
        assert only_one(l1.rpc.listpeers()['peers'])['connected']
    
        # Do the bump, again, same feerate
>       bump = l1.rpc.openchannel_bump(chan_id, chan_amount,
                                       initpsbt['psbt'],
                                       funding_feerate=next_feerate)

tests/test_opening.py:668: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
contrib/pyln-client/pyln/client/lightning.py:1206: in openchannel_bump
    return self.call("openchannel_bump", payload)
contrib/pyln-testing/pyln/testing/utils.py:718: in call
    res = LightningRpc.call(self, method, payload, cmdprefix, filter)
contrib/pyln-client/pyln/client/lightning.py:398: in call
    resp, buf = self._readobj(sock, buf)
contrib/pyln-client/pyln/client/lightning.py:315: in _readobj
    b = sock.recv(max(1024, len(buff)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyln.client.lightning.UnixSocket object at 0x7f34675aae80>
length = 1024

    def recv(self, length: int) -> bytes:
        if self.sock is None:
            raise socket.error("not connected")
    
>       return self.sock.recv(length)
E       Failed: Timeout >1200.0s
```
  • Loading branch information
rustyrussell committed Oct 23, 2023
1 parent 920e50d commit 4216aff
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ struct subd {

/* Output buffer */
struct msg_queue *outq;

/* After we've told it to tx_abort, we don't send anything else. */
bool rcvd_tx_abort;
};

static struct subd *find_subd(struct peer *peer,
Expand All @@ -66,6 +69,10 @@ static struct subd *find_subd(struct peer *peer,
for (size_t i = 0; i < tal_count(peer->subds); i++) {
struct subd *subd = peer->subds[i];

/* Once we sent it tx_abort, we pretend it doesn't exist */
if (subd->rcvd_tx_abort)
continue;

/* Once we see a message using the real channel_id, we
* clear the temporary_channel_id */
if (channel_id_eq(&subd->channel_id, channel_id)) {
Expand Down Expand Up @@ -1040,6 +1047,7 @@ static struct subd *new_subd(struct peer *peer,
subd->temporary_channel_id = NULL;
subd->opener_revocation_basepoint = NULL;
subd->conn = NULL;
subd->rcvd_tx_abort = false;

/* Connect it to the peer */
tal_arr_expand(&peer->subds, subd);
Expand All @@ -1056,6 +1064,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
u8 *decrypted;
struct channel_id channel_id;
struct subd *subd;
enum peer_wire type;


decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs,
peer->peer_in);
Expand All @@ -1066,6 +1076,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
}
tal_free(peer->peer_in);

type = fromwire_peektype(decrypted);

/* dev_disconnect can disable read */
if (!peer->dev_read_enabled)
return read_hdr_from_peer(peer_conn, peer);
Expand All @@ -1083,8 +1095,6 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,

/* After this we should be able to match to subd by channel_id */
if (!extract_channel_id(decrypted, &channel_id)) {
enum peer_wire type = fromwire_peektype(decrypted);

/* We won't log this anywhere else, so do it here. */
status_peer_io(LOG_IO_IN, &peer->id, decrypted);

Expand Down Expand Up @@ -1137,6 +1147,15 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
/* Tell them to write. */
msg_enqueue(subd->outq, take(decrypted));

/* Is this a tx_abort? Ignore from now on, and close after sending! */
if (type == WIRE_TX_ABORT) {
subd->rcvd_tx_abort = true;
/* In case it doesn't close by itself */
notleak(new_reltimer(&peer->daemon->timers, subd,
time_from_sec(5),
close_subd_timeout, subd));
}

/* Wait for them to wake us */
return io_wait(peer_conn, &peer->peer_in, read_hdr_from_peer, peer);
}
Expand Down

0 comments on commit 4216aff

Please sign in to comment.