Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci: localClient improvements & bugfixes & pubsub Unsubscribe issues #2748

Merged
merged 15 commits into from Nov 13, 2018

Conversation

melekes
Copy link
Contributor

@melekes melekes commented Nov 2, 2018

Closes #2721

  • Updated all relevant documentation in docs
  • Updated all code comments where relevant
  • Wrote tests
  • Updated CHANGELOG_PENDING.md

@melekes melekes changed the base branch from master to develop November 2, 2018 10:51
@codecov-io
Copy link

codecov-io commented Nov 2, 2018

Codecov Report

Merging #2748 into develop will decrease coverage by 0.1%.
The diff coverage is 26.19%.

@@             Coverage Diff             @@
##           develop    #2748      +/-   ##
===========================================
- Coverage    62.29%   62.19%   -0.11%     
===========================================
  Files          212      212              
  Lines        17219    17253      +34     
===========================================
+ Hits         10727    10730       +3     
- Misses        5591     5619      +28     
- Partials       901      904       +3
Impacted Files Coverage Δ
libs/pubsub/pubsub.go 83.89% <ø> (ø) ⬆️
mempool/mempool.go 77.81% <ø> (ø) ⬆️
rpc/core/consensus.go 0% <0%> (ø) ⬆️
rpc/core/mempool.go 0% <0%> (ø) ⬆️
evidence/reactor.go 62.62% <0%> (-2.38%) ⬇️
consensus/replay_file.go 0% <0%> (ø) ⬆️
p2p/pex/addrbook.go 68.91% <100%> (ø) ⬆️
mempool/reactor.go 64.19% <14.28%> (ø) ⬆️
consensus/reactor.go 65.93% <36.36%> (-1.05%) ⬇️
consensus/state.go 79.78% <60%> (ø) ⬆️
... and 3 more

@melekes melekes changed the title [WIP] Unresponsive Tendermint abci: localClient improvements & bugfixes Nov 8, 2018
```
[54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine              module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout"
```

#2721 (comment)
#2721 (comment)

It's confusing that sometimes we check if peer has a state, but most of
the times we expect it to be there

1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138
2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited)

I will change everything to always assume peer has a state and panic
otherwise

that should help identify issues earlier
App callback should be protected by lock as well (note this was already
done for InitChainAsync, why not for others???). Otherwise, when we
execute the block, tx might come in and call the callback in the same
time we're updating it in execBlockOnProxyApp => DATA RACE

Fixes #2721

Consensus state is locked

```
goroutine 113333 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00)
        /usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*RWMutex).RLock(0xc001800090)
        /usr/local/go/src/sync/rwmutex.go:50 +0x4e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248
9c0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236
```

because localClient is locked

```
goroutine 1899 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500)
        /usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
        /usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132
```

tx comes in and CheckTx is executed right when we execute the block

```
goroutine 111044 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0x0)
        /usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
        /usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8
reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...)
        /usr/local/go/src/reflect/value.go:447 +0x449
reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344)
        /usr/local/go/src/reflect/value.go:308 +0xa4
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188
net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:1964 +0x44
net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:2361 +0x127
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394
net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:1964 +0x44
net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:2741 +0xab
net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0)
        /usr/local/go/src/net/http/server.go:1847 +0x646
created by net/http.(*Server).Serve
        /usr/local/go/src/net/http/server.go:2851 +0x2f5
```
@melekes melekes force-pushed the 2721-unresponsive-tm branch 2 times, most recently from ee43715 to 611268a Compare November 8, 2018 14:42
@melekes melekes changed the title abci: localClient improvements & bugfixes abci: localClient improvements & bugfixes & pubsub Unsubscribe issues Nov 8, 2018
…consensus

in /dump_consensus_state RPC endpoint, skip a peer with no state
also, do not log DeliverTx result (to be consistent with other memthods)
Copy link
Contributor

@ebuchman ebuchman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly this PR is addressing multiple distinct issues:

  1. If the app panics, we deadlock. The app panicing is undefined behaviour, but as discussed previously it would probably be ideal for rpc endpoints to still be useful in this scenario - they shouldn't block, and it should be easy to figure out there's a problem like the app panicing

  2. PeerState being accessed in the mempool reactor before it's set in the consensus reactor. This can happen if we have txs to send to a peer right away and mempool routine is ready to do that before ConsensusReactor.AddPeer has finished running (non-deterministic due to the order of reactor.AddPeer being iteration over a map)

  3. Under heavy load, the BroadcastTxCommit endpoint saturates the pubsub and eventually deadlocks as subscription channels are sometimes not fully drained. Is it true that this only happens when CheckTx fails, and so the method returns before we get to select over deliverTxrResCh ?

Really great work getting to the bottom of all of this - thanks a lot!

abci/client/client.go Outdated Show resolved Hide resolved
types.ToRequestInitChain(req),
types.ToResponseInitChain(res),
)
app.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yikes, an outisder!

@@ -154,64 +170,73 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {

func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we can keep this without the defer in such small functions, no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if we want to be more or less resilient to app failures. If there's no app call, then sure.

ps := src.Get(types.PeerStateKey).(*PeerState)
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of checking ok if we're just going to panic anyways?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?? we only panic if src has no state (!ok). I don't understand what do you mean by "anyways"

Copy link
Contributor

@ebuchman ebuchman Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is we only get !ok if ps := src.Get(types.PeerStateKey).(*PeerState) (ie. without the ok) would have paniced. The point of the ok is to prevent the native panic and do something else. But here we're just explicilty panicing - so why even bother with the ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, right. probably not much sense except better error message

if !ok {
evR.Logger.Info("Found peer without PeerState", "peer", peer)
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of gross. Wish we had something better to co-ordinate this.

a.ourAddrs[addr.String()] = struct{}{}
a.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like if the function is more than one line, and especially if its not on a hot path, we should just use defer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. I think we should only use defer & mutexes when a) there're multiple return conditions b) we call other functions while locked and they (func) can panic c) objects protected by lock accessed throughout all func body.

https://twitter.com/akaliaev/status/1058384359844204544

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, this answers my 1st question, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my concern is that the function might change over time, new return statements be added, and the Unlock will be forgotten about. If we use defer, adding new returns will not be a problem.

Copy link
Contributor

@jaekwon jaekwon Dec 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addr.String() is a function that can panic (if not now, in the future), so we should be using defer here as well.
It's too easy to forget to unlock, or to mutate a line to call a function, which may panic now or in the future...
So I'd prefer that we just use defers whenever possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addr.String() is a function that can panic

fair point

ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
deliverTxResCh := make(chan interface{})
deliverTxResCh := make(chan interface{}, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need the buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because a) we don't want to lock the synchronous pubsub b) we drain it if no result or any error (see defer below; you can't drain non-buffered channel)

// TODO: configurable?
timer := time.NewTimer(60 * 2 * time.Second)
// Wait for the tx to be included in a block or timeout.
var deliverTxTimeout = 10 * time.Second // TODO: configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be too low. We should open a new issue about exposing this as a parameter. I think we would want a default set in the config.toml, but also maybe a url parameter so it can be overridden (within reason) in the http request (?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

120 sec. was too much. But I agree, it should be configurable.

also maybe a url parameter so it can be overridden (within reason) in the http request

this is an attack vector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an attack vector

Hence within reason. We can put a reasonable limit on what that value can be, and have a low default. Not sure if it's really a good idea though.

@ebuchman
Copy link
Contributor

@melekes can you answer

  1. Under heavy load, the BroadcastTxCommit endpoint saturates the pubsub and eventually deadlocks as subscription channels are sometimes not fully drained. Is it true that this only happens when CheckTx fails, and so the method returns before we get to select over deliverTxrResCh ?

@melekes
Copy link
Contributor Author

melekes commented Nov 13, 2018

Is it true that this only happens when CheckTx fails, and so the method returns before we get to select over deliverTxrResCh ?

Yes

app.Callback = cb
app.mtx.Unlock()
Copy link
Contributor

@liamsi liamsi Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not deferred because the line above simply can't panic? NVM

@@ -293,9 +300,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.Lock()
cs.mtx.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -26,3 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

### BUG FIXES:

- [abci] unlock mutex in localClient so even when app panics (e.g. during CheckTx), consensus continue working
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm over due to write a proper description of how to do changelogs (and ultimately a linter!) but this should include:

  • capitalize the start of the entry
  • include PR number

@@ -53,8 +61,9 @@ func (app *localClient) EchoAsync(msg string) *ReqRes {

func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that we need defer to catch app panics, but are we sure we want the lock to be held during the callback?

a.ourAddrs[addr.String()] = struct{}{}
a.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my concern is that the function might change over time, new return statements be added, and the Unlock will be forgotten about. If we use defer, adding new returns will not be a problem.

@ebuchman ebuchman merged commit 5a6822c into develop Nov 13, 2018
@ebuchman ebuchman deleted the 2721-unresponsive-tm branch November 13, 2018 16:32
@@ -178,10 +178,10 @@ func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool {

func (a *addrBook) AddPrivateIDs(IDs []string) {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, id := range IDs {
a.privateIDs[p2p.ID(id)] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

addrs := []*knownAddress{}
a.mtx.Lock()
for _, addr := range a.addrLookup {
addrs = append(addrs, addr.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

defer eventBus.Unsubscribe(context.Background(), "mempool", q)
defer func() {
// drain deliverTxResCh to make sure we don't block
LOOP:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to drain after Unsubscribe, thus guaranteeing that no further items can be sent to the channel? In between draining here and Unsubscribe, another item may have been sent to the channel. (which is actually fine as long as only 1 item is ever sent to the channel because the capacity is 1, and such channels (even those that have items pending) get garbage collected if there is no receiver anymore).

Because unused channels are supposed to be garbage collected anyways, draining is often not the right thing to do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants