Skip to content

Commit

Permalink
nsqd: add --topology-region --topology-zone
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Nov 28, 2020
1 parent 8adb229 commit c67d5ee
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 2 deletions.
2 changes: 2 additions & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")
flagSet.String("topology-region", opts.TopologyRegion, "A region represents a larger domain, made up of one or more zones")
flagSet.String("topology-zone", opts.TopologyZone, "A zone represents a logical failure domain")

// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
Expand Down
18 changes: 16 additions & 2 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ type identifyDataV2 struct {
SampleRate int32 `json:"sample_rate"`
UserAgent string `json:"user_agent"`
MsgTimeout int `json:"msg_timeout"`
TopologyRegion string `json:"topology_region"`
TopologyZone string `json:"topology_zone"`
}

type identifyEvent struct {
OutputBufferTimeout time.Duration
HeartbeatInterval time.Duration
SampleRate int32
MsgTimeout time.Duration
TopologyRegion string
TopologyZone string
}

type clientV2 struct {
Expand Down Expand Up @@ -88,8 +92,10 @@ type clientV2 struct {
ReadyStateChan chan int
ExitChan chan int

ClientID string
Hostname string
ClientID string
Hostname string
TopologyRegion string
TopologyZone string

SampleRate int32

Expand Down Expand Up @@ -161,6 +167,8 @@ func (c *clientV2) Identify(data identifyDataV2) error {
c.ClientID = data.ClientID
c.Hostname = data.Hostname
c.UserAgent = data.UserAgent
c.TopologyRegion = data.TopologyRegion
c.TopologyZone = data.TopologyZone
c.metaLock.Unlock()

err := c.SetHeartbeatInterval(data.HeartbeatInterval)
Expand Down Expand Up @@ -188,6 +196,8 @@ func (c *clientV2) Identify(data identifyDataV2) error {
HeartbeatInterval: c.HeartbeatInterval,
SampleRate: c.SampleRate,
MsgTimeout: c.MsgTimeout,
TopologyRegion: c.TopologyRegion,
TopologyZone: c.TopologyZone,
}

// update the client's message pump
Expand All @@ -204,6 +214,8 @@ func (c *clientV2) Stats() ClientStats {
clientID := c.ClientID
hostname := c.Hostname
userAgent := c.UserAgent
topologyZone := c.TopologyZone
topologyRegion := c.TopologyRegion
var identity string
var identityURL string
if c.AuthState != nil {
Expand Down Expand Up @@ -239,6 +251,8 @@ func (c *clientV2) Stats() ClientStats {
AuthIdentity: identity,
AuthIdentityURL: identityURL,
PubCounts: pubCounts,
TopologyZone: topologyZone,
TopologyRegion: topologyRegion,
}
if stats.TLS {
p := prettyConnectionState{c.tlsConn.ConnectionState()}
Expand Down
4 changes: 4 additions & 0 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou
HTTPPort int `json:"http_port"`
TCPPort int `json:"tcp_port"`
StartTime int64 `json:"start_time"`
TopologyZone string `json:"topology_zone"`
TopologyRegion string `json:"topology_region"`
}{
Version: version.Binary,
BroadcastAddress: s.nsqd.getOpts().BroadcastAddress,
Hostname: hostname,
TCPPort: s.nsqd.RealTCPAddr().Port,
HTTPPort: s.nsqd.RealHTTPAddr().Port,
StartTime: s.nsqd.GetStartTime().Unix(),
TopologyZone: s.nsqd.getOpts().TopologyZone,
TopologyRegion: s.nsqd.getOpts().TopologyRegion,
}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Options struct {
AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"`
HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"`
HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"`
TopologyRegion string `flag:"topology-region"`
TopologyZone string `flag:"topology-zone"`

// diskqueue options
DataPath string `flag:"data-path"`
Expand Down
4 changes: 4 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error)
AuthRequired bool `json:"auth_required"`
OutputBufferSize int `json:"output_buffer_size"`
OutputBufferTimeout int64 `json:"output_buffer_timeout"`
TopologyRegion string `json:"topology_region"`
TopologyZone string `json:"topology_zone"`
}{
MaxRdyCount: p.nsqd.getOpts().MaxRdyCount,
Version: version.Binary,
Expand All @@ -437,6 +439,8 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error)
AuthRequired: p.nsqd.IsAuthEnabled(),
OutputBufferSize: client.OutputBufferSize,
OutputBufferTimeout: int64(client.OutputBufferTimeout / time.Millisecond),
TopologyRegion: p.nsqd.getOpts().TopologyRegion,
TopologyZone: p.nsqd.getOpts().TopologyZone,
})
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
Expand Down
2 changes: 2 additions & 0 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type ClientStats struct {
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`
TopologyZone string `json:"topology_zone"`
TopologyRegion string `json:"topology_region"`

PubCounts []PubCount `json:"pub_counts,omitempty"`

Expand Down

0 comments on commit c67d5ee

Please sign in to comment.