Skip to content

Commit

Permalink
Experiment with async message processing
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 17, 2021
1 parent 27f1d60 commit cb500e6
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 6 deletions.
6 changes: 6 additions & 0 deletions server/conf.go
Expand Up @@ -204,6 +204,11 @@ func ProcessConfigFile(configFile string, opts *Options) error {
return err
}
opts.ReplaceDurable = v.(bool)
case "async_msg_processing", "async_message_processing":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.AsyncMsgProcessing = v.(bool)
}
}
return nil
Expand Down Expand Up @@ -714,6 +719,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&sopts.EncryptionCipher, "encryption_cipher", stores.CryptoCipherAutoSelect, "Encryption cipher. Supported are AES and CHACHA (default is AES)")
fs.StringVar(&encryptionKey, "encryption_key", "", "Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead")
fs.BoolVar(&sopts.ReplaceDurable, "replace_durable", false, "Replace the existing durable subscription instead of reporting a duplicate durable error")
fs.BoolVar(&sopts.AsyncMsgProcessing, "async_msg_processing", false, "Trigger message processing (find subscription and do delivery) outside of the io loop")

// First, we need to call NATS's ConfigureOptions() with above flag set.
// It will be augmented with NATS specific flags and call fs.Parse(args) for us.
Expand Down
1 change: 1 addition & 0 deletions server/conf_test.go
Expand Up @@ -453,6 +453,7 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "partitioning: 123", wrongTypeErr)
expectFailureFor(t, "syslog_name: 123", wrongTypeErr)
expectFailureFor(t, "replace_durable: 123", wrongTypeErr)
expectFailureFor(t, "async_msg_processing: 123", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_channels:false}", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_msgs:false}", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_bytes:false}", wrongTypeErr)
Expand Down
119 changes: 113 additions & 6 deletions server/server.go
Expand Up @@ -47,7 +47,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.22.1"
VERSION = "0.22.2"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -765,6 +765,19 @@ type StanServer struct {
serverURLs []string
// If using an external server, capture the URL that was given for return in ClientURL().
providedServerURL string

// This is used when the user does not want the processing of the
// incoming messages (and delivery to matching subscriptions) to be
// done from the ioLoop.
amp asyncMsgProcessing
}

type asyncMsgProcessing struct {
mu sync.Mutex
cond *sync.Cond
channels map[*channel]struct{}
inWait bool
closed bool
}

type subsSentAndAckReplication struct {
Expand Down Expand Up @@ -1359,6 +1372,18 @@ type Options struct {
Clustering ClusteringOptions
NATSClientOpts []nats.Option
ReplaceDurable bool // If true, the subscription request for a durable subscription will replace the current durable instead of failing with duplicate durable error.

// By default, when the server receives incoming messages, they are replicated
// (when in clustered mode), persisted in their corresponding channel messages
// store, then processed, that is finding matching subscriptions and initiating
// delivery, and then, publish ACKs are sent back to publishers. All this execute
// from the "ioLoop" go routine.
// This option, when enabled, will cause the processing of incoming messages,
// that is, the matching of subscriptions and delivery of messages, to be done
// outside of the "ioLoop" go routine.
// This may help reduce producer's latency, but may decrease subscription performance,
// so this should not be enabled without proper consideration.
AsyncMsgProcessing bool
}

// Clone returns a deep copy of the Options object.
Expand Down Expand Up @@ -4232,6 +4257,13 @@ func (s *StanServer) startIOLoop() {
// to incoming NATS messages.
ready := &sync.WaitGroup{}
ready.Add(1)
if s.opts.AsyncMsgProcessing {
s.ioChannelWG.Add(1)
ready.Add(1)
s.amp.channels = make(map[*channel]struct{}, 64)
s.amp.cond = sync.NewCond(&s.amp.mu)
go s.asyncMsgProcessing(ready)
}
go s.ioLoop(ready)
ready.Wait()
}
Expand Down Expand Up @@ -4323,6 +4355,8 @@ func (s *StanServer) ioLoop(ready *sync.WaitGroup) {
max = 0
batch = make([]*ioPendingMsg, 0, batchSize)
dciopm *ioPendingMsg
doamp = s.opts.AsyncMsgProcessing
amp = &s.amp
)

synchronizationRequest := func(iopm *ioPendingMsg) {
Expand Down Expand Up @@ -4393,17 +4427,26 @@ func (s *StanServer) ioLoop(ready *sync.WaitGroup) {
// If clustered, wait on the result of replication.
storeIOPendingMsgs(batch)

var notifyAMP bool

// flush all the stores with messages written to them...
for c := range storesToFlush {
if err := c.store.Msgs.Flush(); err != nil {
// TODO: Attempt recovery, notify publishers of error.
panic(fmt.Errorf("unable to flush msg store: %v", err))
}
// Call this here, so messages are sent to subscribers,
// which means that msg seq is added to subscription file
s.processMsg(c)
if err := c.store.Subs.Flush(); err != nil {
panic(fmt.Errorf("unable to flush sub store: %v", err))
if doamp {
amp.mu.Lock()
amp.channels[c] = struct{}{}
notifyAMP = true
amp.mu.Unlock()
} else {
// Call this here, so messages are sent to subscribers,
// which means that msg seq is added to subscription file
s.processMsg(c)
if err := c.store.Subs.Flush(); err != nil {
panic(fmt.Errorf("unable to flush sub store: %v", err))
}
}
// Remove entry from map (this is safe in Go)
delete(storesToFlush, c)
Expand All @@ -4420,6 +4463,10 @@ func (s *StanServer) ioLoop(ready *sync.WaitGroup) {
pendingMsgs[i] = nil
}

if notifyAMP {
amp.notify()
}

// clear out pending messages
pendingMsgs = pendingMsgs[:0]

Expand All @@ -4435,6 +4482,62 @@ func (s *StanServer) ioLoop(ready *sync.WaitGroup) {
}
}

func (s *StanServer) asyncMsgProcessing(ready *sync.WaitGroup) {
defer s.ioChannelWG.Done()

var closed bool
var channelsa [64]*channel
var channels = channelsa[:0]

a := &s.amp

ready.Done()

for {
a.mu.Lock()
for closed = a.closed; !closed && len(a.channels) == 0; closed = a.closed {
a.inWait = true
a.cond.Wait()
a.inWait = false
}
for c := range a.channels {
channels = append(channels, c)
delete(a.channels, c)
}
a.mu.Unlock()

for _, c := range channels {
s.processMsg(c)
if err := c.store.Subs.Flush(); err != nil {
s.log.Errorf("Error flushing subscription store for channel %q: %v", c.name, err)
}
}
channels = channels[:0]

if closed {
return
}
}
}

func (a *asyncMsgProcessing) notify() {
a.mu.Lock()
if a.inWait {
a.cond.Signal()
}
a.mu.Unlock()
}

func (a *asyncMsgProcessing) done() {
a.mu.Lock()
defer a.mu.Unlock()
if a.closed {
return
}
a.closed = true
a.cond.Signal()
}

func (s *StanServer) logErrAndSendPublishErr(iopm *ioPendingMsg, err error) {
s.log.Errorf("[Client:%s] Error processing message for subject %q: %v",
iopm.pm.ClientID, iopm.m.Subject, err)
Expand Down Expand Up @@ -5763,6 +5866,10 @@ func (s *StanServer) Shutdown() {
if s.ioChannel != nil {
// Notify the IO channel that we are shutting down
close(s.ioChannelQuit)
// If async sub delivery, make that routine stop
if s.opts.AsyncMsgProcessing {
s.amp.done()
}
} else {
waitForIOStoreLoop = false
}
Expand Down

0 comments on commit cb500e6

Please sign in to comment.