Skip to content

Commit

Permalink
Merge 10ef65d into 7ceff6c
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Aug 2, 2022
2 parents 7ceff6c + 10ef65d commit f3ef7c4
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 34 deletions.
2 changes: 1 addition & 1 deletion server/clustering_test.go
Expand Up @@ -6915,7 +6915,7 @@ func TestClusteringRestoreSnapshotWithDifferentVersionsOfSameChannel(t *testing.
}

func TestClusteringSQLMsgStoreFlushed(t *testing.T) {
if !doSQL {
if persistentStoreType != stores.TypeSQL {
t.SkipNow()
}

Expand Down
9 changes: 9 additions & 0 deletions server/ft_test.go
Expand Up @@ -336,6 +336,9 @@ func TestFTPartition(t *testing.T) {
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := syncNC.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

// Start NATS server independently
ns := natsdTest.RunServer(&nOpts)
Expand Down Expand Up @@ -364,6 +367,9 @@ func TestFTPartition(t *testing.T) {
"-test.run=TestFTPartition$",
"-persistent_store", persistentStoreType,
}
if !doSQL {
params = append(params, "-sql=false")
}
// Start a process that will be the standby
if persistentStoreType == stores.TypeSQL {
params = append(params,
Expand Down Expand Up @@ -483,6 +489,9 @@ func TestFTPartitionReversed(t *testing.T) {
"-test.run=TestFTPartitionReversed$",
"-persistent_store", persistentStoreType,
}
if !doSQL {
params = append(params, "-sql=false")
}
// Start a process that will act as the active server
if persistentStoreType == stores.TypeSQL {
params = append(params,
Expand Down
31 changes: 22 additions & 9 deletions server/server.go
Expand Up @@ -1976,22 +1976,31 @@ func (s *StanServer) start(runningState State) error {
s.state = runningState

var (
err error
recoveredState *stores.RecoveredState
recoveredSubs []*subState
callStoreInit bool
err error
recoveredState *stores.RecoveredState
recoveredSubs []*subState
callStoreInit bool
initThenRecover bool
)

RECOVER:
// Recover the state.
s.log.Noticef("Recovering the state...")
recoveredState, err = s.store.Recover()
if err != nil {
return err
}
if recoveredState != nil {
s.log.Noticef("Recovered %v channel(s)", len(recoveredState.Channels))
if err == stores.ErrNoSrvButChannels {
err = nil
initThenRecover = true
s.log.Warnf("Server information was not recovered, however channels are present. Will initialize server and restart recovery!")
} else {
return err
}
} else {
s.log.Noticef("No recovered state")
if recoveredState != nil {
s.log.Noticef("Recovered %v channel(s)", len(recoveredState.Channels))
} else {
s.log.Noticef("No recovered state")
}
}
// We used to use a NUID as part of the internal subjects in standalone mode
// without channel partitioning. In all other cases, we used the cluster ID.
Expand Down Expand Up @@ -2067,6 +2076,10 @@ func (s *StanServer) start(runningState State) error {
if err := s.store.Init(&s.info); err != nil {
return fmt.Errorf("unable to initialize the store: %v", err)
}
if initThenRecover {
callStoreInit = false
goto RECOVER
}
}

// We don't do the check if we are running FT and/or if
Expand Down
12 changes: 5 additions & 7 deletions server/server_delivery_test.go
Expand Up @@ -262,20 +262,18 @@ func TestPersistentStoreSQLSubsPendingRows(t *testing.T) {
t.SkipNow()
}
source := testSQLSource
sourceAdmin := testSQLSourceAdmin
// If not running tests with `-persistent_store sql`,
// initialize few things and default to MySQL.
if persistentStoreType != stores.TypeSQL {
// If not running tests with `-persistent_store sql`,
// initialize few things and default to MySQL.
source = testDefaultMySQLSource
sourceAdmin := testDefaultMySQLSourceAdmin
sourceAdmin = testDefaultMySQLSourceAdmin
if err := test.CreateSQLDatabase(testSQLDriver, sourceAdmin,
source, testSQLDatabaseName); err != nil {
t.Fatalf("Error setting up test for SQL: %v", err)
}
defer test.DeleteSQLDatabase(testSQLDriver, sourceAdmin, testSQLDatabaseName)
}

cleanupDatastore(t)
defer cleanupDatastore(t)
defer test.DeleteSQLDatabase(testSQLDriver, sourceAdmin, testSQLDatabaseName)

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()
Expand Down
121 changes: 121 additions & 0 deletions server/server_storefailures_test.go
Expand Up @@ -14,14 +14,18 @@
package server

import (
"database/sql"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/nats-io/nats-streaming-server/spb"
"github.com/nats-io/nats-streaming-server/stores"
"github.com/nats-io/nats-streaming-server/test"
"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"
)
Expand Down Expand Up @@ -756,3 +760,120 @@ func TestCreateChannelError(t *testing.T) {
t.Fatal("Did not log the expected error")
}
}

type noSrvStateLogger struct {
dummyLogger
ch chan string
}

func (l *noSrvStateLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "not recovered") {
select {
case l.ch <- msg:
default:
}
}
}

func TestFileStoreServerStateMissing(t *testing.T) {
l := &noSrvStateLogger{ch: make(chan string, 1)}

// Force the storage to be FILE, regardless of persistent_store value.
opts := GetDefaultOptions()
opts.CustomLogger = l
opts.StoreType = stores.TypeFile
opts.FilestoreDir = defaultDataStore
opts.FileStoreOpts.BufferSize = 1024
if err := os.RemoveAll(defaultDataStore); err != nil {
t.Fatalf("Error cleaning up datastore: %v", err)
}
defer func() {
if err := os.RemoveAll(defaultDataStore); err != nil {
t.Fatalf("Error cleaning up datastore: %v", err)
}
}()
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

if _, err := s.channels.createChannel(s, "foo"); err != nil {
t.Fatal("Expected error, got none")
}

s.Shutdown()
os.Remove(filepath.Join(defaultDataStore, "server.dat"))

s = runServerWithOpts(t, opts, nil)
defer s.Shutdown()

if c := s.channels.get("foo"); c == nil {
t.Fatal("Expected channel to be recovered, was not")
}

select {
case <-l.ch: // OK
default:
t.Fatal("Did not get warning about non recovered server state")
}
}

func TestSQLStoreServerStateMissing(t *testing.T) {
// If can't run any SQL test, bail out
if !doSQL {
t.SkipNow()
}

l := &noSrvStateLogger{ch: make(chan string, 1)}

// Force the storage to be SQL
source := testSQLSource
sourceAdmin := testSQLSourceAdmin
// If not running tests with `-persistent_store sql`,
// initialize few things and default to MySQL.
if persistentStoreType != stores.TypeSQL {
source = testDefaultMySQLSource
sourceAdmin = testDefaultMySQLSourceAdmin
if err := test.CreateSQLDatabase(testSQLDriver, sourceAdmin,
source, testSQLDatabaseName); err != nil {
t.Fatalf("Error setting up test for SQL: %v", err)
}
}
defer test.DeleteSQLDatabase(testSQLDriver, sourceAdmin, testSQLDatabaseName)

opts := GetDefaultOptions()
opts.CustomLogger = l
opts.StoreType = stores.TypeSQL
opts.SQLStoreOpts.Driver = testSQLDriver
opts.SQLStoreOpts.Source = source
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

if _, err := s.channels.createChannel(s, "foo"); err != nil {
t.Fatal("Expected error, got none")
}

s.Shutdown()

db, err := sql.Open(testSQLDriver, source)
if err != nil {
t.Fatalf("Error opening database: %v", err)
}
defer db.Close()
if _, err := db.Exec("DELETE FROM ServerInfo"); err != nil {
t.Fatalf("Error deleting server info: %v", err)
}
db.Close()

s = runServerWithOpts(t, opts, nil)
defer s.Shutdown()

if c := s.channels.get("foo"); c == nil {
t.Fatal("Expected channel to be recovered, was not")
}

select {
case <-l.ch: // OK
default:
t.Fatal("Did not get warning about non recovered server state")
}
}
6 changes: 2 additions & 4 deletions server/server_test.go
Expand Up @@ -97,9 +97,7 @@ func TestMain(m *testing.M) {
)
flag.StringVar(&bst, "bench_store", "", "store type for bench tests (mem, file)")
flag.StringVar(&pst, "persistent_store", "", "store type for server recovery related tests (file)")
// This one is added here so that if we want to disable sql for stores tests
// we can use the same param for all packages as in "go test -v ./... -sql=false"
flag.Bool("sql", false, "Not used for server tests")
flag.BoolVar(&doSQL, "sql", true, "Enable/disable SQL tests in server package")
// Those 2 sql related flags are handled here, not in AddSQLFlags
flag.BoolVar(&sqlCreateDb, "sql_create_db", true, "create sql database on startup")
flag.BoolVar(&sqlDeleteDb, "sql_delete_db", true, "delete sql database on exit")
Expand Down Expand Up @@ -131,7 +129,7 @@ func TestMain(m *testing.M) {

// If either (or both) bench or tests select an SQL store, we need to do
// so initializing and cleaning at the end of the test.
doSQL = benchStoreType == stores.TypeSQL || persistentStoreType == stores.TypeSQL
doSQL = doSQL || benchStoreType == stores.TypeSQL || persistentStoreType == stores.TypeSQL

if doSQL {
defaultSources := make(map[string][]string)
Expand Down
2 changes: 1 addition & 1 deletion stores/common_msg_test.go
Expand Up @@ -638,7 +638,7 @@ func TestCSFirstAndLastMsg(t *testing.T) {
defer s.Close()

limit := testDefaultStoreLimits
limit.MaxAge = 100 * time.Millisecond
limit.MaxAge = 250 * time.Millisecond
if err := s.SetLimits(&limit); err != nil {
t.Fatalf("Error setting limits: %v", err)
}
Expand Down
39 changes: 32 additions & 7 deletions stores/filestore.go
Expand Up @@ -1361,8 +1361,22 @@ func (fs *FileStore) Recover() (*RecoveredState, error) {
if err != nil {
return nil, fmt.Errorf("unable to recover server file %q: %v", fs.serverFile.name, err)
}

// Get the channels (they are subdirectories of rootDir).
// We used to do that after checking for serverInfo and recovering clients.
// However, we want to know if there are channles in case serverInfo is nil,
// but to maintain behavior on when errors were returned, capture possible
// error under a different error variable and return that error (if any)
// at the location where we used to.
channels, cerr := fs.getChannelsDir()

// If the server file is empty, then we are done
if serverInfo == nil {
// If there is no server info stored, we expect no state to be stored,
// so if there are possible channles, return an error.
if len(channels) > 0 {
return nil, ErrNoSrvButChannels
}
// We return the file store instance, but no recovered state.
return nil, nil
}
Expand All @@ -1373,19 +1387,15 @@ func (fs *FileStore) Recover() (*RecoveredState, error) {
return nil, fmt.Errorf("unable to recover client file %q: %v", fs.clientsFile.name, err)
}

// Get the channels (there are subdirectories of rootDir)
channels, err = ioutil.ReadDir(fs.fm.rootDir)
if err != nil {
// If there was an error reading the channels, return the error now.
if cerr != nil {
return nil, err
}
if len(channels) > 0 {
wg, poolCh, errCh, recoverCh := initParalleRecovery(fs.opts.ParallelRecovery, len(channels))
ctx := &channelRecoveryCtx{wg: wg, poolCh: poolCh, errCh: errCh, recoverCh: recoverCh}
for _, c := range channels {
// Channels are directories. Ignore simple files
if !c.IsDir() {
continue
}
// We know that c is a directory (see getChannelsDir())
channel := c.Name()
channelDirName := filepath.Join(fs.fm.rootDir, channel)
limits := fs.genericStore.getChannelLimits(channel)
Expand Down Expand Up @@ -1432,6 +1442,21 @@ func (fs *FileStore) Recover() (*RecoveredState, error) {
return recoveredState, nil
}

func (fs *FileStore) getChannelsDir() ([]os.FileInfo, error) {
files, err := ioutil.ReadDir(fs.fm.rootDir)
if err != nil {
return nil, err
}
channels := make([]os.FileInfo, 0, len(files))
for _, f := range files {
if !f.IsDir() {
continue
}
channels = append(channels, f)
}
return channels, nil
}

func initParalleRecovery(maxGoRoutines, foundChannels int) (*sync.WaitGroup, chan struct{}, chan error, chan *recoveredChannel) {
wg := sync.WaitGroup{}
poolCh := make(chan struct{}, maxGoRoutines)
Expand Down
7 changes: 7 additions & 0 deletions stores/sqlstore.go
Expand Up @@ -793,6 +793,13 @@ func (s *SQLStore) Recover() (*RecoveredState, error) {
// If there is no row, that means nothing to recover. Return nil for the
// state and no error.
if err == sql.ErrNoRows {
// If there are channels, we should return an error.
var maxChannelID int64
r := s.db.QueryRow(sqlStmts[sqlRecoverMaxChannelID])
r.Scan(&maxChannelID)
if maxChannelID > 0 {
return nil, ErrNoSrvButChannels
}
return nil, nil
}
return nil, sqlStmtError(sqlRecoverServerInfo, err)
Expand Down
11 changes: 6 additions & 5 deletions stores/store.go
Expand Up @@ -34,11 +34,12 @@ const (

// Errors.
var (
ErrTooManyChannels = errors.New("too many channels")
ErrTooManySubs = errors.New("too many subscriptions per channel")
ErrNotSupported = errors.New("not supported")
ErrAlreadyExists = errors.New("already exists")
ErrNotFound = errors.New("not found")
ErrTooManyChannels = errors.New("too many channels")
ErrTooManySubs = errors.New("too many subscriptions per channel")
ErrNotSupported = errors.New("not supported")
ErrAlreadyExists = errors.New("already exists")
ErrNotFound = errors.New("not found")
ErrNoSrvButChannels = errors.New("no server state recovered but channels present")
)

// StoreLimits define limits for a store.
Expand Down

0 comments on commit f3ef7c4

Please sign in to comment.