Skip to content

Commit

Permalink
[FIXED] Possible race between connect and pub/sub with partitioning
Browse files Browse the repository at this point in the history
In partitioning mode, when a client connects, the connect request
may reach several servers, but the first response the client gets
allows it to proceed with either publish or subscribe.
So it is possible for a server running in partitioning mode to
receives a connection request followed by a message or subscription.
Although the conn request would be first in the tcp connection, it
is then possible that the PubMsg or SubscriptionRequest be processed
first due to the use of different nats subscriptions.
To prevent that, when checking if a client exists, in this particular
mode we will possibly wait to be notified when the client has been
registered.
  • Loading branch information
kozlovic committed Dec 8, 2017
1 parent 8442674 commit cb4cec9
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 6 deletions.
50 changes: 47 additions & 3 deletions server/client.go
Expand Up @@ -3,16 +3,18 @@
package server

import (
"github.com/nats-io/nats-streaming-server/stores"
"sync"
"time"

"github.com/nats-io/nats-streaming-server/stores"
)

// This is a proxy to the store interface.
type clientStore struct {
sync.RWMutex
clients map[string]*client
store stores.Store
clients map[string]*client
waitOnRegister map[string]chan struct{}
store stores.Store
}

// client has information needed by the server. A client is also
Expand Down Expand Up @@ -53,6 +55,13 @@ func (cs *clientStore) register(ID, hbInbox string) (*client, bool, error) {
}
c = &client{info: sc, subs: make([]*subState, 0, 4)}
cs.clients[ID] = c
if cs.waitOnRegister != nil {
ch := cs.waitOnRegister[ID]
if ch != nil {
ch <- struct{}{}
delete(cs.waitOnRegister, ID)
}
}
return c, true, nil
}

Expand All @@ -71,6 +80,9 @@ func (cs *clientStore) unregister(ID string) (*client, error) {
}
c.Unlock()
delete(cs.clients, ID)
if cs.waitOnRegister != nil {
delete(cs.waitOnRegister, ID)
}
err := cs.store.DeleteClient(ID)
return c, err
}
Expand All @@ -80,6 +92,38 @@ func (cs *clientStore) isValid(ID string) bool {
return cs.lookup(ID) != nil
}

// isValidWithTimeout will return true if the client is registered,
// false if not.
// When the client is not yet registered, this call sets up a go channel
// and waits up to `timeout` for the register() call to send the newly
// registered client to the channel.
// On timeout, this call return false to indicate that the client
// has still not registered.
func (cs *clientStore) isValidWithTimeout(ID string, timeout time.Duration) bool {
cs.Lock()
c := cs.clients[ID]
if c != nil {
cs.Unlock()
return true
}
if cs.waitOnRegister == nil {
cs.waitOnRegister = make(map[string]chan struct{})
}
ch := make(chan struct{}, 1)
cs.waitOnRegister[ID] = ch
cs.Unlock()
select {
case <-ch:
return true
case <-time.After(timeout):
// We timed out, remove the entry in the map
cs.Lock()
delete(cs.waitOnRegister, ID)
cs.Unlock()
return false
}
}

// Lookup a client
func (cs *clientStore) lookup(ID string) *client {
cs.RLock()
Expand Down
192 changes: 192 additions & 0 deletions server/partitions_test.go
Expand Up @@ -4,6 +4,8 @@ package server

import (
"fmt"
"github.com/nats-io/go-nats-streaming/pb"
"github.com/nats-io/nuid"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -620,3 +622,193 @@ func TestPartitionsWildcards(t *testing.T) {
t.Fatal("Expected error on subscribe, got none")
}
}

func checkWaitOnRegisterMap(t tLogger, s *StanServer, size int) {
var start time.Time
for {
s.clients.RLock()
m := s.clients.waitOnRegister
mlen := len(m)
s.clients.RUnlock()
if m != nil && mlen == size {
return
}
if start.IsZero() {
start = time.Now()
} else if time.Since(start) > clientCheckTimeout+50*time.Millisecond {
stackFatalf(t, "map should have been created and of size %d, got %v", size, mlen)
}
time.Sleep(15 * time.Millisecond)
}
}

func TestPartitionsRaceOnPub(t *testing.T) {
setPartitionsVarsForTest()
defer resetDefaultPartitionsVars()

clientCheckTimeout = 150 * time.Millisecond
defer func() { clientCheckTimeout = defaultClientCheckTimeout }()

opts := GetDefaultOptions()
opts.Partitioning = true
opts.AddPerChannel("foo", &stores.ChannelLimits{})
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

// stan.Connect() call blocks until it receives the response, so it is
// not possible to publish a message before the server has processed the
// connection request. However, with partitioning, it is possible that
// the Connect() call receives an OK from one of the server and immediately
// publishes a message. That message, although behind the connection request
// going to another server, may be dispatched before (due to use of different
// internal subscriptions for connection handling and client publish).
//
// To simulate this here, we use a NATS connection and send a PubMsg manually,
// followed by the regular stan.Connect(). Then we wait for the response on
// the PubMsg and we should not get any error in PubAck.

// Create a direct NATS connection
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unable to connect: %v", err)
}
defer nc.Close()

pubSubj := fmt.Sprintf("%s.foo", s.info.Publish)
pubReq := &pb.PubMsg{ClientID: clientName, Subject: "foo", Data: []byte("hello")}
pubNuid := nuid.New()

pubSub, err := nc.SubscribeSync(nats.NewInbox())
if err != nil {
t.Fatalf("Error creating sub on pub response: %v", err)
}

// Repeat the test, because even with bug, it would be possible
// that the connection request is still processed first, which
// would make the test pass.
for i := 0; i < 5; i++ {
func() {
pubReq.Guid = pubNuid.Next()
pubBytes, _ := pubReq.Marshal()

// First case is to make sure that we get the failure if
// no connection is processed.
resp, err := nc.Request(pubSubj, pubBytes, clientCheckTimeout+50*time.Millisecond)
if err != nil {
t.Fatalf("Error on request: %v", err)
}
pubResp := &pb.PubAck{}
pubResp.Unmarshal(resp.Data)
if pubResp.Error != ErrInvalidPubReq.Error() {
t.Fatalf("Expected error %q, got %q", ErrInvalidPubReq, pubResp.Error)
}
// Ensure that the notification map has been created, but is empty.
checkWaitOnRegisterMap(t, s, 0)

// Now resend a message, but this time don't wait for the response here,
// instead connect, which should cause the PubMsg to be processed correctly.
if err := nc.PublishRequest(pubSubj, pubSub.Subject, pubBytes); err != nil {
t.Fatalf("Error sending PubMsg: %v", err)
}
checkWaitOnRegisterMap(t, s, 1)
sc, err := stan.Connect(clusterName, clientName, stan.NatsConn(nc))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()

// Now we should get the OK for the PubMsg.
resp, err = pubSub.NextMsg(clientCheckTimeout + 100*time.Millisecond)
if err != nil {
t.Fatalf("Error waiting for pub response: %v", err)
}
pubResp = &pb.PubAck{}
pubResp.Unmarshal(resp.Data)
if pubResp.Error != "" {
t.Fatalf("Connection %d - Error on publish: %v", (i + 1), pubResp.Error)
}
checkWaitOnRegisterMap(t, s, 0)
}()
}
}

func TestPartitionsRaceOnSub(t *testing.T) {
setPartitionsVarsForTest()
defer resetDefaultPartitionsVars()

clientCheckTimeout = 150 * time.Millisecond
defer func() { clientCheckTimeout = defaultClientCheckTimeout }()

opts := GetDefaultOptions()
opts.Partitioning = true
opts.AddPerChannel("foo", &stores.ChannelLimits{})
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

// See description of the issue in TestPartitionsRaceOnPub.
// This is the same except that we are dealing with subscription requests
// here.

// Create a direct NATS connection
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unable to connect: %v", err)
}
defer nc.Close()

subSubj := s.info.Subscribe
subReq := &pb.SubscriptionRequest{ClientID: clientName, Subject: "foo", AckWaitInSecs: 30, MaxInFlight: 1}

subSub, err := nc.SubscribeSync(nats.NewInbox())
if err != nil {
t.Fatalf("Error creating sub on sub response: %v", err)
}

// Repeat the test, because even with bug, it would be possible
// that the connection request is still processed first, which
// would make the test pass.
for i := 0; i < 5; i++ {
func() {
subReq.Inbox = nats.NewInbox()
subBytes, _ := subReq.Marshal()

// First case is to make sure that we get the failure if
// no connection is processed.
resp, err := nc.Request(subSubj, subBytes, clientCheckTimeout+50*time.Millisecond)
if err != nil {
t.Fatalf("Error on request: %v", err)
}
subResp := &pb.SubscriptionResponse{}
subResp.Unmarshal(resp.Data)
if subResp.Error != ErrInvalidSubReq.Error() {
t.Fatalf("Expected error %q, got %q", ErrInvalidSubReq, subResp.Error)
}
// Ensure that the notification map has been created, but is empty.
checkWaitOnRegisterMap(t, s, 0)

// Now resend the subscription, but this time don't wait for the response here,
// instead connect, which should cause the SubscriptionRequest to be processed correctly.
if err := nc.PublishRequest(subSubj, subSub.Subject, subBytes); err != nil {
t.Fatalf("Error sending PubMsg: %v", err)
}
checkWaitOnRegisterMap(t, s, 1)
sc, err := stan.Connect(clusterName, clientName, stan.NatsConn(nc))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()

// Now we should get the OK for the PubMsg.
resp, err = subSub.NextMsg(clientCheckTimeout + 100*time.Millisecond)
if err != nil {
t.Fatalf("Error waiting for pub response: %v", err)
}
subResp = &pb.SubscriptionResponse{}
subResp.Unmarshal(resp.Data)
if subResp.Error != "" {
t.Fatalf("Connection %d - Error on subscribe: %v", (i + 1), subResp.Error)
}
checkWaitOnRegisterMap(t, s, 0)
}()
}
}
43 changes: 40 additions & 3 deletions server/server.go
Expand Up @@ -90,6 +90,19 @@ const (
natsInboxFirstChar = '_'
// Length of a NATS inbox
natsInboxLen = 29 // _INBOX.<nuid: 22 characters>

// In partitioning mode, when a client connects, the connect request
// may reach several servers, but the first response the client gets
// allows it to proceed with either publish or subscribe.
// So it is possible for a server running in partitioning mode to
// receives a connection request followed by a message or subscription.
// Although the conn request would be first in the tcp connection, it
// is then possible that the PubMsg or SubscriptionRequest be processed
// first due to the use of different nats subscriptions.
// To prevent that, when checking if a client exists, in this particular
// mode we will possibly wait to be notified when the client has been
// registered. This is the default duration for this wait.
defaultClientCheckTimeout = 2 * time.Second
)

// Constant to indicate that sendMsgToSub() should check number of acks pending
Expand Down Expand Up @@ -125,7 +138,10 @@ var (
// A Regexp is safe for concurrent use by multiple goroutines.
var clientIDRegEx *regexp.Regexp

var testAckWaitIsInMillisecond bool
var (
testAckWaitIsInMillisecond bool
clientCheckTimeout = defaultClientCheckTimeout
)

func computeAckWait(wait int32) time.Duration {
unit := time.Second
Expand Down Expand Up @@ -2164,8 +2180,21 @@ func (s *StanServer) processClientPublish(m *nats.Msg) {
// else we will report an error below...
}

// Make sure we have a clientID, guid, etc.
if pm.Guid == "" || !s.clients.isValid(pm.ClientID) || !util.IsChannelNameValid(pm.Subject, false) {
// Make sure we have guid, valid channel name
valid := false
if pm.Guid != "" && util.IsChannelNameValid(pm.Subject, false) {
// Check client is valid.
if s.partitions != nil {
// In partitioning it is possible that we get there before
// the connect request is processed. If so, make sure we wait
// for conn request to be processed first.
// Check clientCheckTimeout doc for details.
valid = s.clients.isValidWithTimeout(pm.ClientID, clientCheckTimeout)
} else {
valid = s.clients.isValid(pm.ClientID)
}
}
if !valid {
s.log.Errorf("Received invalid client publish message %v", pm)
s.sendPublishErr(m.Reply, pm.Guid, ErrInvalidPubReq)
return
Expand Down Expand Up @@ -3140,6 +3169,14 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
if r := s.partitions.sl.Match(sr.Subject); len(r) == 0 {
return
}
// Also check that the connection request has already
// been processed. Check clientCheckTimeout doc for details.
if !s.clients.isValidWithTimeout(sr.ClientID, clientCheckTimeout) {
s.log.Errorf("[Client:%s] Rejecting subscription on %q: connection not created yet",
sr.ClientID, sr.Subject)
s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSubReq)
return
}
}

// Grab channel state, create a new one if needed.
Expand Down

0 comments on commit cb4cec9

Please sign in to comment.