Skip to content

Commit

Permalink
Merge pull request #1136 from andyxning/add_max_channel_client_connec…
Browse files Browse the repository at this point in the history
…tion_count

nsqd: add max_channel_client_connection_count
  • Loading branch information
ploxiln committed Feb 22, 2019
2 parents cbdcd54 + 95221cd commit fd1cde1
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 5 deletions.
1 change: 1 addition & 0 deletions apps/nsqd/nsqd.go
Expand Up @@ -119,6 +119,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Duration("max-output-buffer-timeout", opts.MaxOutputBufferTimeout, "maximum client configurable duration of time between flushing to a client")
flagSet.Duration("min-output-buffer-timeout", opts.MinOutputBufferTimeout, "minimum client configurable duration of time between flushing to a client")
flagSet.Duration("output-buffer-timeout", opts.OutputBufferTimeout, "default duration of time between flushing data to clients")
flagSet.Int("max-channel-consumers", opts.MaxChannelConsumers, "maximum channel consumer connection count per nsqd instance (default 0, i.e., unlimited)")

// statsd integration options
flagSet.String("statsd-address", opts.StatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats")
Expand Down
11 changes: 9 additions & 2 deletions nsqd/channel.go
Expand Up @@ -389,15 +389,22 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura
}

// AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(clientID int64, client Consumer) {
func (c *Channel) AddClient(clientID int64, client Consumer) error {
c.Lock()
defer c.Unlock()

_, ok := c.clients[clientID]
if ok {
return
return nil
}

maxChannelConsumers := c.ctx.nsqd.getOpts().MaxChannelConsumers
if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers {
return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS")
}

c.clients[clientID] = client
return nil
}

// RemoveClient removes a client from the Channel's client list
Expand Down
29 changes: 28 additions & 1 deletion nsqd/channel_test.go
Expand Up @@ -152,7 +152,8 @@ func TestChannelEmptyConsumer(t *testing.T) {
channel := topic.GetChannel("channel")
client := newClientV2(0, conn, &context{nsqd})
client.SetReadyCount(25)
channel.AddClient(client.ID, client)
err := channel.AddClient(client.ID, client)
test.Equal(t, err, nil)

for i := 0; i < 25; i++ {
msg := NewMessage(topic.GenerateID(), []byte("test"))
Expand All @@ -173,6 +174,32 @@ func TestChannelEmptyConsumer(t *testing.T) {
}
}

func TestMaxChannelConsumers(t *testing.T) {
opts := NewOptions()
opts.Logger = test.NewTestLogger(t)
opts.MaxChannelConsumers = 1
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

conn, _ := mustConnectNSQD(tcpAddr)
defer conn.Close()

topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")

client1 := newClientV2(1, conn, &context{nsqd})
client1.SetReadyCount(25)
err := channel.AddClient(client1.ID, client1)
test.Equal(t, err, nil)

client2 := newClientV2(2, conn, &context{nsqd})
client2.SetReadyCount(25)
err = channel.AddClient(client2.ID, client2)
test.NotEqual(t, err, nil)
}

func TestChannelHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = test.NewTestLogger(t)
Expand Down
3 changes: 2 additions & 1 deletion nsqd/nsqd_test.go
Expand Up @@ -180,7 +180,8 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
topic := nsqd.GetTopic(topicName)
ephemeralChannel := topic.GetChannel("ch1#ephemeral")
client := newClientV2(0, nil, &context{nsqd})
ephemeralChannel.AddClient(client.ID, client)
err := ephemeralChannel.AddClient(client.ID, client)
test.Equal(t, err, nil)

msg := NewMessage(topic.GenerateID(), body)
topic.PutMessage(msg)
Expand Down
2 changes: 2 additions & 0 deletions nsqd/options.go
Expand Up @@ -58,6 +58,7 @@ type Options struct {
MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"`
OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"`
MaxChannelConsumers int `flag:"max-channel-consumers"`

// statsd integration
StatsdAddress string `flag:"statsd-address"`
Expand Down Expand Up @@ -134,6 +135,7 @@ func NewOptions() *Options {
MaxOutputBufferTimeout: 30 * time.Second,
MinOutputBufferTimeout: 25 * time.Millisecond,
OutputBufferTimeout: 250 * time.Millisecond,
MaxChannelConsumers: 0,

StatsdPrefix: "nsq.%s",
StatsdInterval: 60 * time.Second,
Expand Down
6 changes: 5 additions & 1 deletion nsqd/protocol_v2.go
Expand Up @@ -615,7 +615,11 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
for {
topic := p.ctx.nsqd.GetTopic(topicName)
channel = topic.GetChannel(channelName)
channel.AddClient(client.ID, client)
if err := channel.AddClient(client.ID, client); err != nil {
return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CONSUMERS",
fmt.Sprintf("channel consumers for %s:%s exceeds limit of %d",
topicName, channelName, p.ctx.nsqd.getOpts().MaxChannelConsumers))
}

if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
channel.RemoveClient(client.ID)
Expand Down

0 comments on commit fd1cde1

Please sign in to comment.