Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large number of ephemeral consumers could exhaust Go runtime's max OS threads. #2764

Merged
merged 1 commit into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 17 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)

if o.isPushMode() {
o.dthresh = JsDeleteWaitTimeDefault
// Add in 1 sec of jitter above and beyond the default of 5s.
o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond
if !o.isDurable() {
// Check if we are not durable that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
Expand Down Expand Up @@ -1026,8 +1027,8 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
}

func (o *consumer) deleteNotActive() {
// Need to check again if there is not an interest now that the timer fires.
if !o.hasNoLocalInterest() {
// If we have local interest simply return.
if o.hasLocalInterest() {
return
}
o.mu.RLock()
Expand Down Expand Up @@ -1055,15 +1056,19 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
var fs bool
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
ca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
if ca != nil {
s.Warnf("Consumer assignment not cleaned up, retrying")
meta.ForwardProposal(removeEntry)
if fs {
s.Warnf("Consumer assignment not cleaned up, retrying")
meta.ForwardProposal(removeEntry)
}
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
fs = true
} else {
return
}
Expand Down Expand Up @@ -3014,6 +3019,11 @@ func (o *consumer) isActive() bool {
return active
}

// hasLocalInterest returns if we have local interest.
func (o *consumer) hasLocalInterest() bool {
return !o.hasNoLocalInterest()
}

// hasNoLocalInterest return true if we have no local interest.
func (o *consumer) hasNoLocalInterest() bool {
o.mu.RLock()
Expand Down Expand Up @@ -3239,7 +3249,7 @@ func validFilteredSubject(filteredSubject string, subjects []string) bool {
return false
}

// SetInActiveDeleteThreshold sets the delete threshold for how long to wait
// setInActiveDeleteThreshold sets the delete threshold for how long to wait
// before deleting an inactive ephemeral consumer.
func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
o.mu.Lock()
Expand All @@ -3253,6 +3263,7 @@ func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
}
deleteWasRunning := o.dtmr != nil
stopAndClearTimer(&o.dtmr)
// Do not add jitter if set via here.
o.dthresh = dthresh
if deleteWasRunning {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
Expand Down
39 changes: 34 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net"
"os"
"path"
"runtime"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -5129,6 +5130,26 @@ func (o *consumerFileStore) encryptState(buf []byte) []byte {
return o.aek.Seal(nonce, nonce, buf, nil)
}

// Used to limit number of disk IO calls in flight since they could all be blocking an OS thread.
// https://github.com/nats-io/nats-server/issues/2742
var dios chan struct{}

// Used to setup our simplistic counting semaphore using buffered channels.
// golang.org's semaphore seemed a bit heavy.
func init() {
// Minimum for blocking disk IO calls.
const minNIO = 4
nIO := runtime.GOMAXPROCS(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we recommend a value of around 4 for this dont we? Is that really enough under heavy load?

I am not sure how the raft layer behaves exactly but this will presumably not slow down handling of the general keep alive of raft membership right? But could slow down significantly committing data there I presume. Would that increase leader elections under very stressed situations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is usually set to num cores/cpus, so on my machine its 20 (hyper-threads), but on smaller machines we can use up to 4 instead of setting to 1 or 2. I played with increasing this in some tests but makes no difference since the machine can only be really doing N things in parallel.

This is only effecting the consumer stores, not stream stores which is what is used for nrg/raft layer.

Under a loaded test I had yesterday, performance was much better when lots of ephemeral consumers were trying to do those things all at the same time, and of course the crazy number of threads took its toll on the OS. Did both darwin and linux, so feel pretty good after spending yesterday on it perf wise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know defaults but we often say nats server works best on 4ish cores and recommend setting this to 4, was thinking about that

if nIO < minNIO {
nIO = minNIO
}
dios = make(chan struct{}, nIO)
// Fill it up to start.
for i := 0; i < nIO; i++ {
dios <- struct{}{}
}
}

func (o *consumerFileStore) writeState(buf []byte) error {
// Check if we have the index file open.
o.mu.Lock()
Expand All @@ -5147,8 +5168,10 @@ func (o *consumerFileStore) writeState(buf []byte) error {
ifn := o.ifn
o.mu.Unlock()

// Lock not held here.
// Lock not held here but we do limit number of outstanding calls that could block OS threads.
<-dios
err := ioutil.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}

o.mu.Lock()
if err != nil {
Expand Down Expand Up @@ -5458,7 +5481,9 @@ func (o *consumerFileStore) Stop() error {

if len(buf) > 0 {
o.waitOnFlusher()
<-dios
err = ioutil.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
}
return err
}
Expand Down Expand Up @@ -5498,15 +5523,19 @@ func (o *consumerFileStore) delete(streamDeleted bool) error {
}

var err error
// If our stream was deleted it will remove the directories.
if o.odir != _EMPTY_ && !streamDeleted {
err = os.RemoveAll(o.odir)
}
odir := o.odir
o.odir = _EMPTY_
o.closed = true
fs := o.fs
o.mu.Unlock()

// If our stream was not deleted this will remove the directories.
if odir != _EMPTY_ && !streamDeleted {
<-dios
err = os.RemoveAll(odir)
dios <- struct{}{}
}

if !streamDeleted {
fs.removeConsumer(o)
}
Expand Down
51 changes: 51 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3973,3 +3973,54 @@ func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) {
t.Fatalf("Bad State: %+v", ss)
}
}

// This is related to an issue reported where we were exhausting threads by trying to
// cleanup too many consumers at the same time.
// https://github.com/nats-io/nats-server/issues/2742
func TestNoRaceConsumerFileStoreConcurrentDiskIO(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)

// Artificially adjust our environment for this test.
gmp := runtime.GOMAXPROCS(32)
defer runtime.GOMAXPROCS(gmp)

maxT := debug.SetMaxThreads(64)
defer debug.SetMaxThreads(maxT)

fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "MT", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

startCh := make(chan bool)
var wg sync.WaitGroup
var swg sync.WaitGroup

ts := time.Now().UnixNano()

// Create 1000 consumerStores
n := 1000
swg.Add(n)

for i := 1; i <= n; i++ {
name := fmt.Sprintf("o%d", i)
o, err := fs.ConsumerStore(name, &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
wg.Add(1)
swg.Done()

go func() {
defer wg.Done()
// Will make everyone run concurrently.
<-startCh
o.UpdateDelivered(22, 22, 1, ts)
buf, _ := o.(*consumerFileStore).encodeState()
o.(*consumerFileStore).writeState(buf)
o.Delete()
}()
}

swg.Wait()
close(startCh)
wg.Wait()
}