Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: failure to drain channel after unsubsribe can halt entire pubsub #1811

Closed
silasdavis opened this issue Jun 24, 2018 · 5 comments
Closed
Labels
C:rpc Component: JSON RPC, gRPC S:waiting Status: Waiting for response T:bug Type Bug (Confirmed)
Projects
Milestone

Comments

@silasdavis
Copy link
Contributor

silasdavis commented Jun 24, 2018

Because of the blocking send to each subscriber channel: https://github.com/tendermint/tendermint/blob/master/libs/pubsub/pubsub.go#L347:

func (state *state) send(msg interface{}, tags TagMap) {
	for q, clientToChannelMap := range state.queries {
		if q.Matches(tags) {
			for _, ch := range clientToChannelMap {
				ch <- msg
			}
		}
	}
}

If one subscriber blocks on receiving a message then the entire pubsub server stops. In some settings this seems like bad behaviour for a pub sub system (i.e. some subscribers starving others). In Tendermint maybe we cannot use a non-blocking send here if it means some component misses vital messages. However, a non-blocking send would fix this:

func (state *state) send(msg interface{}, tags TagMap) {
	for q, clientToChannelMap := range state.queries {
		if q.Matches(tags) {
			for _, ch := range clientToChannelMap {
				select {
				case ch <- msg:
				default:
					continue
				}
			}
		}
	}
}

Currently a particularly bad consequence of this behaviour is as in the following:

	s := pubsub.Server{}
	s.Subscribe(ctx, sub, qry, out)
	defer s.UnsubscribeAll(ctx, sub)
	for msg := range out {
		// Do stuff
		if err != nil {
			// Return from error doing stuff hoping the defer unsubscribe cleans things up for you
			return err
		}
	}

In this scenario we exit from our subscribing loop in an error and unsubscribing in a defer alone is not enough to clean up. In fact if we get any more messages we will freeze up the entire pubsub server. To cleanup properly we need to do this:

	s.Subscribe(ctx, sub, qry, out)
	defer func() {
		s.UnsubscribeAll(ctx, sub)
		for range out {
			// drain out to make sure we don't block
		}
	}() 
	for msg := range out {
		// Do stuff
		if err != nil {
			// Return from error doing stuff hoping the defer unsubscribe cleans things up for you
			return err
		}
	}

This feels like having to know too about the internals of pubsub, is easy to forget, and may not immediately cause a problem until some unrelated subscriptions stop working.

I think the pubsub server should make draining the channel part of its unsubscribe routine so at least callers know that's all they need to do to not block other subscribers. For my usage a non-blocking send would also make sense so that subscribers that forget to clean up don't halt the system. I also think subscribe should just return a chan interface{} rather than take an argument since it is usually the channel creator that is responsible for closing it, which pubsub does currently (and so a panic occurs if you try and unsubscribe and close your channel). The channel buffer size could be another argument if that is needed.

@ebuchman ebuchman added T:bug Type Bug (Confirmed) C:rpc Component: JSON RPC, gRPC prelaunch labels Jun 28, 2018
@ebuchman ebuchman added this to the launch milestone Jun 28, 2018
@melekes
Copy link
Contributor

melekes commented Jul 4, 2018

This should be discussed/done together with #951.

@xla xla removed the prelaunch label Jul 18, 2018
@xla xla changed the title [Pubsub] failure to drain channel after unsubsribe can halt entire pubsub pubsub: failure to drain channel after unsubsribe can halt entire pubsub Jul 18, 2018
@ebuchman ebuchman modified the milestones: v1.0, launch Sep 21, 2018
@melekes
Copy link
Contributor

melekes commented Oct 2, 2018

In fact if we get any more messages we will freeze up the entire pubsub server. To cleanup properly we need to do this:

	s.Subscribe(ctx, sub, qry, out)
	defer func() {
		s.UnsubscribeAll(ctx, sub)
		for range out {
			// drain out to make sure we don't block
		}
	}() 

that can't be right. UnsubscribeAll closes the out channel

and implementation makes sure we won't put anything else onto this channel (by removing it from internal structures and closing the channel)

@melekes
Copy link
Contributor

melekes commented Oct 2, 2018

Am I missing something?

@xla xla added this to Queued in Launch Oct 3, 2018
@melekes melekes added the S:waiting Status: Waiting for response label Oct 4, 2018
@melekes
Copy link
Contributor

melekes commented Oct 8, 2018

On the second thought, there's a way to freeze the entire pubsub server if the client stops reading messages before it unsubscribes

        s.Subscribe(ctx, sub, qry, out)
	defer func() {
		s.UnsubscribeAll(ctx, sub)
		for range out {
			// unreachable since UnsubscribeAll will block (depending on the ctx of course)
		}
	}() 
	for msg := range out {
		// Do stuff
		if err != nil {
                         // if pubsub gets one more message (`Publish`) before defer is called, it can block forever
			return err
		}
	}

but draining the channel after Unsubscribe won't solve the problem because Unsubscribe will block waiting for Publish to finish. draining the channel before Unsubscribing however will work.

(b)

        s.Subscribe(ctx, sub, qry, out)
	defer func() {
                 for range out {
			// drain out to make sure we don't block
		}
		s.UnsubscribeAll(ctx, sub)
	}() 
	for msg := range out {
		// Do stuff
		if err != nil {
                         // if pubsub gets one more message (`Publish`) before defer is called, it can block forever
			return err
		}
	}

Unfortunately this can't be incorporated into pubsub (due to how pubsub is implemented with blocking send). clients must ensure that either (a) they don't exit reading loop themselves, but instead call Unsubscribe/UnsubscribeAll() or (b) they drain channel before Unsubscribing

(a)

        s.Subscribe(ctx, sub, qry, out)
        go func() {
	    for msg := range out {
	        // will exit when out is closed by Unsubscribe	
	    }
        }()
        s.UnsubscribeAll(ctx, sub)

@melekes
Copy link
Contributor

melekes commented Oct 15, 2018

Added docs in #2641

@melekes melekes closed this as completed Oct 15, 2018
Launch automation moved this from Queued to Done Oct 15, 2018
kfangw pushed a commit to kfangw/blockchain that referenced this issue Jun 10, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C:rpc Component: JSON RPC, gRPC S:waiting Status: Waiting for response T:bug Type Bug (Confirmed)
Projects
No open projects
Launch
  
Done
Development

No branches or pull requests

4 participants