Skip to content

Commit

Permalink
Handle chunking of channel list for gossip reconciliation
Browse files Browse the repository at this point in the history
Reuse code from partitioning by making it generic and putting it in the
util package.
  • Loading branch information
tylertreat committed Nov 13, 2017
1 parent 6aac9ee commit 3f93a85
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 392 deletions.
108 changes: 2 additions & 106 deletions server/partitions.go
Expand Up @@ -24,8 +24,6 @@ const (
channelInterest = 1
// Messages channel size
partitionsMsgChanSize = 65536
// Number of bytes used to encode a channel name
partitionsEncodedChannelLen = 2
// Default wait before checking for channels when notified
// that the NATS cluster topology has changed. This gives a chance
// for the new server joining the cluster to send its subscriptions
Expand Down Expand Up @@ -188,7 +186,7 @@ func (p *partitions) initSubscriptions() error {
func (p *partitions) checkChannelsUniqueInCluster() error {
// We use the subscription on an inbox to get the replies.
// Send our list
if err := p.sendChannelsList(p.inboxSub.Subject); err != nil {
if err := util.SendChannelsList(p.channels, p.sendListSubject, p.inboxSub.Subject, p.nc, p.s.serverID); err != nil {
return fmt.Errorf("unable to send channels list: %v", err)
}
// Since we don't know how many servers are out there, keep
Expand All @@ -212,85 +210,6 @@ func (p *partitions) checkChannelsUniqueInCluster() error {
}
}

// Sends the list of channels to a known subject, possibly splitting the list
// in several requests if it cannot fit in a single message.
func (p *partitions) sendChannelsList(replyInbox string) error {
// Since the NATS message payload is limited, we need to repeat
// requests if all channels can't fit in a request.
maxPayload := int(p.nc.MaxPayload())
// Reuse this request object to send the (possibly many) protocol message(s).
header := &spb.CtrlMsg{
ServerID: p.s.serverID,
MsgType: spb.CtrlMsg_Partitioning,
}
// The Data field (a byte array) will require 1+len(array)+(encoded size of array).
// To be conservative, let's just use a 8 bytes integer
headerSize := header.Size() + 1 + 8
var (
bytes []byte // Reused buffer in which the request is to marshal info
n int // Size of the serialized request in the above buffer
count int // Number of channels added to the request
)
for start := 0; start != len(p.channels); start += count {
bytes, n, count = p.encodeRequest(header, bytes, headerSize, maxPayload, start)
if count == 0 {
return fmt.Errorf("message payload too small to send partitioning channels list")
}
if err := p.nc.PublishRequest(p.sendListSubject, replyInbox, bytes[:n]); err != nil {
return err
}
}
return p.nc.Flush()
}

// Adds as much channels as possible (based on the NATS max message payload) and
// returns a serialized request. The buffer `reqBytes` is passed (and returned) so
// that it can be reused if more than one request is needed. This call will
// expand the size as needed. The number of bytes used in this buffer is returned
// along with the number of encoded channels.
func (p *partitions) encodeRequest(request *spb.CtrlMsg, reqBytes []byte, headerSize, maxPayload, start int) ([]byte, int, int) {
// Each string will be encoded in the form:
// - length (2 bytes)
// - string as a byte array.
var _encodedSize = [partitionsEncodedChannelLen]byte{}
encodedSize := _encodedSize[:]
// We are going to encode the channels in this buffer
chanBuf := make([]byte, 0, maxPayload)
var (
count int // Number of encoded channels
estimatedSize = headerSize // This is not an overestimation of the total size
numBytes int // This is what is returned by MarshalTo
)
for i := start; i < len(p.channels); i++ {
c := []byte(p.channels[i])
cl := len(c)
needed := partitionsEncodedChannelLen + cl
// Check if adding this channel to current buffer makes us go over
if estimatedSize+needed > maxPayload {
// Special case if we cannot even encode 1 channel
if count == 0 {
return reqBytes, 0, 0
}
break
}
// Encoding the channel here. First the size, then the channel name as byte array.
util.ByteOrder.PutUint16(encodedSize, uint16(cl))
chanBuf = append(chanBuf, encodedSize...)
chanBuf = append(chanBuf, c...)
count++
estimatedSize += needed
}
if count > 0 {
request.Data = chanBuf
reqBytes = util.EnsureBufBigEnough(reqBytes, estimatedSize)
numBytes, _ = request.MarshalTo(reqBytes)
if numBytes > maxPayload {
panic(fmt.Errorf("partitioning: request size is %v (max payload is %v)", numBytes, maxPayload))
}
}
return reqBytes, numBytes, count
}

// Decode the incoming partitioning protocol message.
// It can be an HB, in which case, if it is from a new server
// we send our list to the cluster, or it can be a request
Expand All @@ -313,7 +232,7 @@ func (p *partitions) processChannelsListRequests(m *nats.Msg) {
if req.ServerID == p.s.serverID {
return
}
channels, err := decodeChannels(req.Data)
channels, err := util.DecodeChannels(req.Data)
if err != nil {
p.s.log.Errorf("Error processing partitioning request: %v", err)
return
Expand Down Expand Up @@ -352,29 +271,6 @@ func (p *partitions) processChannelsListRequests(m *nats.Msg) {
}
}

// decodes from the given by array the list of channel names and return
// them as an array of strings.
func decodeChannels(data []byte) ([]string, error) {
channels := []string{}
pos := 0
for pos < len(data) {
if pos+2 > len(data) {
return nil, fmt.Errorf("partitioning: unable to decode size, pos=%v len=%v", pos, len(data))
}
cl := int(util.ByteOrder.Uint16(data[pos:]))
pos += partitionsEncodedChannelLen
end := pos + cl
if end > len(data) {
return nil, fmt.Errorf("partitioning: unable to decode channel, pos=%v len=%v max=%v (string=%v)",
pos, cl, len(data), string(data[pos:]))
}
c := string(data[pos:end])
channels = append(channels, c)
pos = end
}
return channels, nil
}

// Notifies all go-routines used by partitioning code that the
// server is shuting down and closes the internal NATS connection.
func (p *partitions) shutdown() {
Expand Down
5 changes: 3 additions & 2 deletions server/partitions_test.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/nats-streaming-server/spb"
"github.com/nats-io/nats-streaming-server/stores"
"github.com/nats-io/nats-streaming-server/util"
)

func setPartitionsVarsForTest() {
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestPartitionsMaxPayload(t *testing.T) {
cb := func(m *nats.Msg) {
req := &spb.CtrlMsg{}
req.Unmarshal(m.Data)
channels, _ := decodeChannels(req.Data)
channels, _ := util.DecodeChannels(req.Data)
for _, c := range channels {
verifyChannels[c] = struct{}{}
count++
Expand Down Expand Up @@ -447,7 +448,7 @@ func TestPartitionsSendListAfterRouteEstablished(t *testing.T) {
return func(m *nats.Msg) {
req := &spb.CtrlMsg{}
req.Unmarshal(m.Data)
channels, _ := decodeChannels(req.Data)
channels, _ := util.DecodeChannels(req.Data)
for _, c := range channels {
mu.Lock()
if c == "foo" && *s != nil && req.ServerID == (*s).serverID {
Expand Down
157 changes: 98 additions & 59 deletions server/server.go
Expand Up @@ -1672,71 +1672,20 @@ func (s *StanServer) start(runningState State) error {
// missing channels from another server and creates them. This should only be
// called if the server is running in clustered mode.
func (s *StanServer) startChannelGossiping() error {
// Setup sub for receiving channel lists for reconciliation.
if _, err := s.ncr.Subscribe(s.getChannelReconcileInbox(), s.reconcileChannels); err != nil {
return err
}

// Setup sub for receiving channel gossip.
gossipInbox := fmt.Sprintf("%s.%s.channels", defaultGossipPrefix, s.opts.ID)
_, err := s.ncr.Subscribe(gossipInbox, func(m *nats.Msg) {
c := &spb.ChannelGossip{}
if err := c.Unmarshal(m.Data); err != nil {
s.log.Errorf("Received invalid channel gossip message: %v", err)
return
}
if c.NodeID == s.opts.Clustering.NodeID {
// Ignore messages from ourselves.
return
}
s.channels.RLock()
count := uint32(len(s.channels.channels))
s.channels.RUnlock()

if count >= c.NumChannels {
return
}

// If our count is less, request the list of channels from the other
// server and reconcile.
resp, err := s.ncr.Request(m.Reply, nil, 5*time.Second)
if err != nil {
s.log.Errorf("Failed to fetch channels from another server: %v", err)
return
}
channelResp := &spb.ChannelResponse{}
if err := channelResp.Unmarshal(resp.Data); err != nil {
s.log.Errorf("Received invalid channels response from server: %v", err)
return
}
// Reconcile list of channels.
for _, channel := range channelResp.Channels {
if s.channels.get(channel) == nil {
if _, err := s.channels.createChannel(s, channel); err != nil {
s.log.Errorf("Failed to create channel %s while reconciling channels: %v", channel, err)
} else {
s.log.Debugf("Created channel %s while reconciling channels", channel)
}
}
}
})
if err != nil {
if _, err := s.ncr.Subscribe(gossipInbox, s.processChannelGossip); err != nil {
return err
}

// Setup sub for receiving channel requests.
reqInbox := fmt.Sprintf("%s.%s.channels.%s", defaultGossipPrefix, s.opts.ID, s.opts.Clustering.NodeID)
_, err = s.ncr.Subscribe(reqInbox, func(m *nats.Msg) {
s.channels.RLock()
channels := make([]string, len(s.channels.channels))
i := 0
for channel, _ := range s.channels.channels {
channels[i] = channel
i++
}
s.channels.RUnlock()
resp, err := (&spb.ChannelResponse{Channels: channels}).Marshal()
if err != nil {
panic(err)
}
s.ncr.Publish(m.Reply, resp)
})
if err != nil {
reqInbox := fmt.Sprintf("%s.%s.channels.request.%s", defaultGossipPrefix, s.opts.ID, s.opts.Clustering.NodeID)
if _, err := s.ncr.Subscribe(reqInbox, s.processChannelListRequest); err != nil {
return err
}

Expand Down Expand Up @@ -1765,6 +1714,96 @@ func (s *StanServer) startChannelGossiping() error {
return nil
}

// reconcileChannels is a NATS handler that handles asynchronous responses from
// a peer after requesting the list of channels. This is done asynchronously
// since the list can exceed the max NATS message size, so we stream the list
// in chunks asynchronously. Reconciling channels should be idempotent.
func (s *StanServer) reconcileChannels(m *nats.Msg) {
// Message cannot be empty, we are supposed to receive
// a spb.CtrlMsg_Partitioning protocol.
if len(m.Data) == 0 {
return
}
req := spb.CtrlMsg{}
if err := req.Unmarshal(m.Data); err != nil {
s.log.Errorf("Error processing channel reconcile request: %v", err)
return
}
// Ignore messages from ourselves.
if req.ServerID == s.serverID {
return
}
channels, err := util.DecodeChannels(req.Data)
if err != nil {
s.log.Errorf("Error processing channel reconcile request: %v", err)
return
}
// Reconcile list of channels.
for _, channel := range channels {
if s.channels.get(channel) == nil {
if _, err := s.channels.createChannel(s, channel); err != nil {
s.log.Errorf("Failed to create channel %s while reconciling channels: %v", channel, err)
} else {
s.log.Debugf("Created channel %s while reconciling channels", channel)
}
}
}
}

// processChannelGossip is a NATS handler that handles gossip messages from
// cluster peers containing the number of channels they have. If we detect
// we're missing channels, request the list of channels from the server to
// reconcile.
func (s *StanServer) processChannelGossip(m *nats.Msg) {
c := &spb.ChannelGossip{}
if err := c.Unmarshal(m.Data); err != nil {
s.log.Errorf("Received invalid channel gossip message: %v", err)
return
}
if c.NodeID == s.opts.Clustering.NodeID {
// Ignore messages from ourselves.
return
}
s.channels.RLock()
count := uint32(len(s.channels.channels))
s.channels.RUnlock()

if count >= c.NumChannels {
return
}

// If our count is less, request the list of channels from the other
// server and reconcile. This is done asynchronously.
if err := s.ncr.PublishRequest(m.Reply, s.getChannelReconcileInbox(), nil); err != nil {
s.log.Errorf("Failed to fetch channels from another server: %v", err)
return
}
}

// processChannelListRequest is a NATS handler that handles requests from
// cluster peers to stream the list of channels to them. This is done by
// streaming the list in chunks to the peer since the list can exceed the max
// NATS message size.
func (s *StanServer) processChannelListRequest(m *nats.Msg) {
s.channels.RLock()
channels := make([]string, len(s.channels.channels))
i := 0
for channel, _ := range s.channels.channels {
channels[i] = channel
i++
}
s.channels.RUnlock()
if err := util.SendChannelsList(channels, m.Reply, "", s.ncr, s.serverID); err != nil {
s.log.Errorf("Failed to send channel list to peer: %v", err)
}
}

// getChannelReconcileInbox returns the NATS subject for streaming channels to
// for reconciliation.
func (s *StanServer) getChannelReconcileInbox() string {
return fmt.Sprintf("%s.%s.channels.reconcile.%s", defaultGossipPrefix, s.opts.ID, s.opts.Clustering.NodeID)
}

// startMetadataRaftNode creates and starts the metadata Raft group. This is
// used to handle replication of connection state and cluster metadata. This
// should only be called if the server is running in clustered mode.
Expand Down

0 comments on commit 3f93a85

Please sign in to comment.