diff --git a/server/filestore.go b/server/filestore.go index 62d04d4020..4ab51b013b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6104,6 +6104,16 @@ func (fs *fileStore) Delete() error { if fs.isClosed() { // Always attempt to remove since we could have been closed beforehand. os.RemoveAll(fs.fcfg.StoreDir) + // Since we did remove, if we did have anything remaining make sure to + // call into any storage updates that had been registered. + fs.mu.Lock() + cb, msgs, bytes := fs.scb, int64(fs.state.Msgs), int64(fs.state.Bytes) + // Guard against double accounting if called twice. + fs.state.Msgs, fs.state.Bytes = 0, 0 + fs.mu.Unlock() + if msgs > 0 && cb != nil { + cb(-msgs, -bytes, 0, _EMPTY_) + } return ErrStoreClosed } fs.Purge() diff --git a/server/jetstream.go b/server/jetstream.go index 56e2a6fe99..8f4a807c2c 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2582,9 +2582,13 @@ func (jsa *jsAccount) checkTemplateOwnership(tname, sname string) bool { return false } +type Number interface { + int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | float32 | float64 +} + // friendlyBytes returns a string with the given bytes int64 // represented as a size, such as 1KB, 10MB, etc... -func friendlyBytes(bytes int64) string { +func friendlyBytes[T Number](bytes T) string { fbytes := float64(bytes) base := 1024 pre := []string{"K", "M", "G", "T", "P", "E"} diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index db90a57ab5..b0e1704202 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3883,3 +3883,56 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) { t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes) } } + +func TestJetStreamClusterStreamAccountingOnStoreError(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesAccountLimitTempl, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + Replicas: 3, + }) + require_NoError(t, err) + + msg := strings.Repeat("Z", 32*1024) + for i := 0; i < 100; i++ { + sendStreamMsg(t, nc, "foo", msg) + } + s := c.randomServer() + acc, err := s.LookupAccount("$U") + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + mset.mu.Lock() + mset.store.Stop() + sjs := mset.js + mset.mu.Unlock() + + // Now delete the stream + js.DeleteStream("TEST") + + // Wait for this to propgate. + // The bug will have us not release reserved resources properly. + time.Sleep(time.Second) + info, err := js.AccountInfo() + require_NoError(t, err) + + // Default tier + if info.Store != 0 { + t.Fatalf("Expected store to be 0 but got %v", friendlyBytes(info.Store)) + } + + // Now check js from server directly regarding reserved. + sjs.mu.RLock() + reserved := sjs.storeReserved + sjs.mu.RUnlock() + // Under bug will show 1GB + if reserved != 0 { + t.Fatalf("Expected store reserved to be 0 after stream delete, got %v", friendlyBytes(reserved)) + } +} diff --git a/server/stream.go b/server/stream.go index 46981340e3..89739b1ade 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4646,9 +4646,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { if store != nil { - if err := store.Delete(); err != nil { - return err - } + // Ignore errors. + store.Delete() } // Release any resources. js.releaseStreamResources(&mset.cfg) @@ -4658,9 +4657,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { os.Remove(filepath.Join(accDir, streamsDir)) os.Remove(accDir) } else if store != nil { - if err := store.Stop(); err != nil { - return err - } + // Ignore errors. + store.Stop() } return nil