diff --git a/apps/nsqd/options.go b/apps/nsqd/options.go index 8aa19f028..b6799066e 100644 --- a/apps/nsqd/options.go +++ b/apps/nsqd/options.go @@ -198,5 +198,12 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)") flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)") + experiments := app.StringArray{} + var validExperiments []string + for _, e := range nsqd.AllExperiments { + validExperiments = append(validExperiments, fmt.Sprintf("'%s'", string(e))) + } + flagSet.Var(&experiments, "enable-experiment", fmt.Sprintf("enable experimental feature (may be given multiple times) (valid options: %s)", strings.Join(validExperiments, ", "))) + return flagSet } diff --git a/nsqd/channel.go b/nsqd/channel.go index 218790570..f2452ee27 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -48,11 +48,12 @@ type Channel struct { backend BackendQueue - zoneLocalMsgChan chan *Message - regionLocalMsgChan chan *Message - memoryMsgChan chan *Message - exitFlag int32 - exitMutex sync.RWMutex + topologyAwareConsumption bool + zoneLocalMsgChan chan *Message + regionLocalMsgChan chan *Message + memoryMsgChan chan *Message + exitFlag int32 + exitMutex sync.RWMutex // state tracking clients map[int64]Consumer @@ -78,13 +79,14 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD, deleteCallback func(*Channel)) *Channel { c := &Channel{ - topicName: topicName, - name: channelName, - memoryMsgChan: nil, - clients: make(map[int64]Consumer), - deleteCallback: deleteCallback, - nsqd: nsqd, - ephemeral: strings.HasSuffix(channelName, "#ephemeral"), + topicName: topicName, + name: channelName, + memoryMsgChan: nil, + clients: make(map[int64]Consumer), + deleteCallback: deleteCallback, + nsqd: nsqd, + ephemeral: strings.HasSuffix(channelName, "#ephemeral"), + topologyAwareConsumption: nsqd.getOpts().HasExperiment(TopologyAwareConsumption), } if nsqd.getOpts().TopologyRegion != "" { @@ -313,15 +315,17 @@ func (c *Channel) PutMessage(m *Message) error { } func (c *Channel) put(m *Message) error { - select { - case c.zoneLocalMsgChan <- m: - return nil - default: - } - select { - case c.regionLocalMsgChan <- m: - return nil - default: + if c.topologyAwareConsumption { + select { + case c.zoneLocalMsgChan <- m: + return nil + default: + } + select { + case c.regionLocalMsgChan <- m: + return nil + default: + } } select { case c.memoryMsgChan <- m: diff --git a/nsqd/options.go b/nsqd/options.go index 35586eef6..f1505501d 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -86,6 +86,28 @@ type Options struct { DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` + + // experimental features + Experiments []string `flag:"enable-experiment" cfg:"enable_experiment"` +} + +type Experiment string + +const ( + TopologyAwareConsumption Experiment = "topology-aware-consumption" +) + +var AllExperiments = []Experiment{ + TopologyAwareConsumption, +} + +func (o Options) HasExperiment(e Experiment) bool { + for _, v := range o.Experiments { + if string(e) == v { + return true + } + } + return false } func NewOptions() *Options { diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c875cddb5..26358061e 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -312,10 +312,11 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { } msgTimeout = identifyData.MsgTimeout - if identifyData.TopologyZone == p.nsqd.getOpts().TopologyZone { + isToplogyAware := p.nsqd.getOpts().HasExperiment(TopologyAwareConsumption) + if identifyData.TopologyZone == p.nsqd.getOpts().TopologyZone && isToplogyAware { zoneLocal = true } - if identifyData.TopologyRegion == p.nsqd.getOpts().TopologyRegion { + if identifyData.TopologyRegion == p.nsqd.getOpts().TopologyRegion && isToplogyAware { regionLocal = true } case <-heartbeatChan: diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 712a509d8..b20f32356 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -211,6 +211,7 @@ func TestMultipleConsumerV2(t *testing.T) { // if it's message pump is waiting func TestSameZoneConsumerV2(t *testing.T) { opts := NewOptions() + opts.Experiments = []string{string(TopologyAwareConsumption)} opts.Logger = test.NewTestLogger(t) opts.ClientTimeout = 60 * time.Second opts.TopologyRegion = "region"