Skip to content

Commit

Permalink
nsqd --enable-experiment=topology-aware-consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah authored and zoemccormick committed Oct 19, 2023
1 parent 1952ab9 commit 91517a4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 23 deletions.
7 changes: 7 additions & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,12 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")

experiments := app.StringArray{}
var validExperiments []string
for _, e := range nsqd.AllExperiments {
validExperiments = append(validExperiments, fmt.Sprintf("'%s'", string(e)))
}
flagSet.Var(&experiments, "enable-experiment", fmt.Sprintf("enable experimental feature (may be given multiple times) (valid options: %s)", strings.Join(validExperiments, ", ")))

return flagSet
}
46 changes: 25 additions & 21 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ type Channel struct {

backend BackendQueue

zoneLocalMsgChan chan *Message
regionLocalMsgChan chan *Message
memoryMsgChan chan *Message
exitFlag int32
exitMutex sync.RWMutex
topologyAwareConsumption bool
zoneLocalMsgChan chan *Message
regionLocalMsgChan chan *Message
memoryMsgChan chan *Message
exitFlag int32
exitMutex sync.RWMutex

// state tracking
clients map[int64]Consumer
Expand All @@ -78,13 +79,14 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
deleteCallback func(*Channel)) *Channel {

c := &Channel{
topicName: topicName,
name: channelName,
memoryMsgChan: nil,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
nsqd: nsqd,
ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
topicName: topicName,
name: channelName,
memoryMsgChan: nil,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
nsqd: nsqd,
ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
topologyAwareConsumption: nsqd.getOpts().HasExperiment(TopologyAwareConsumption),
}

if nsqd.getOpts().TopologyRegion != "" {
Expand Down Expand Up @@ -313,15 +315,17 @@ func (c *Channel) PutMessage(m *Message) error {
}

func (c *Channel) put(m *Message) error {
select {
case c.zoneLocalMsgChan <- m:
return nil
default:
}
select {
case c.regionLocalMsgChan <- m:
return nil
default:
if c.topologyAwareConsumption {
select {
case c.zoneLocalMsgChan <- m:
return nil
default:
}
select {
case c.regionLocalMsgChan <- m:
return nil
default:
}
}
select {
case c.memoryMsgChan <- m:
Expand Down
22 changes: 22 additions & 0 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ type Options struct {
DeflateEnabled bool `flag:"deflate"`
MaxDeflateLevel int `flag:"max-deflate-level"`
SnappyEnabled bool `flag:"snappy"`

// experimental features
Experiments []string `flag:"enable-experiment" cfg:"enable_experiment"`
}

type Experiment string

const (
TopologyAwareConsumption Experiment = "topology-aware-consumption"
)

var AllExperiments = []Experiment{
TopologyAwareConsumption,
}

func (o Options) HasExperiment(e Experiment) bool {
for _, v := range o.Experiments {
if string(e) == v {
return true
}
}
return false
}

func NewOptions() *Options {
Expand Down
5 changes: 3 additions & 2 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,11 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
}

msgTimeout = identifyData.MsgTimeout
if identifyData.TopologyZone == p.nsqd.getOpts().TopologyZone {
isToplogyAware := p.nsqd.getOpts().HasExperiment(TopologyAwareConsumption)
if identifyData.TopologyZone == p.nsqd.getOpts().TopologyZone && isToplogyAware {
zoneLocal = true
}
if identifyData.TopologyRegion == p.nsqd.getOpts().TopologyRegion {
if identifyData.TopologyRegion == p.nsqd.getOpts().TopologyRegion && isToplogyAware {
regionLocal = true
}
case <-heartbeatChan:
Expand Down
1 change: 1 addition & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func TestMultipleConsumerV2(t *testing.T) {
// if it's message pump is waiting
func TestSameZoneConsumerV2(t *testing.T) {
opts := NewOptions()
opts.Experiments = []string{string(TopologyAwareConsumption)}
opts.Logger = test.NewTestLogger(t)
opts.ClientTimeout = 60 * time.Second
opts.TopologyRegion = "region"
Expand Down

0 comments on commit 91517a4

Please sign in to comment.