diff --git a/server/filestore.go b/server/filestore.go index 7836db016a5..69bc641b2ec 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -174,6 +174,7 @@ type fileStore struct { hh hash.Hash64 qch chan struct{} cfs []ConsumerStore + lfd *os.File sips int closed bool fip bool @@ -387,6 +388,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim qch: make(chan struct{}), } + if err := fs.lockFileSystem(); err != nil { + return nil, err + } + // Set flush in place to AsyncFlush which by default is false. fs.fip = !fcfg.AsyncFlush @@ -6633,6 +6638,8 @@ func (fs *fileStore) Stop() error { fs.cancelSyncTimer() fs.cancelAgeChk() + fs.lfd.Close() + // We should update the upper usage layer on a stop. cb, bytes := fs.scb, int64(fs.state.Bytes) @@ -7015,6 +7022,18 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { } } +func (fs *fileStore) lockFileSystem() error { + var err error + lpath := filepath.Join(fs.fcfg.StoreDir, "LOCK") + if fs.lfd, err = os.Create(lpath); err != nil { + return fmt.Errorf("could not create `%s': %v", lpath, err) + } + if err = lockFile(fs.lfd); err != nil { + return err + } + return nil +} + //////////////////////////////////////////////////////////////////////////////// // Consumers //////////////////////////////////////////////////////////////////////////////// diff --git a/server/filestore_test.go b/server/filestore_test.go index cccde441f19..60bd29663cd 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5714,3 +5714,27 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } + +func TestFileStoreLockFileSystem(t *testing.T) { + dir := t.TempDir() + + fs0, err0 := newFileStore(FileStoreConfig{StoreDir: dir}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err0) + fs0.Stop() + + fs1, err1 := newFileStore(FileStoreConfig{StoreDir: dir}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err1) + fs1.Stop() +} + +func TestFileStoreLockFileSystemConflict(t *testing.T) { + dir := t.TempDir() + + fs0, err0 := newFileStore(FileStoreConfig{StoreDir: dir}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err0) + + _, err1 := newFileStore(FileStoreConfig{StoreDir: dir}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_Error(t, err1) + + fs0.Stop() +} diff --git a/server/filestore_unix.go b/server/filestore_unix.go new file mode 100644 index 00000000000..c5b07b6781a --- /dev/null +++ b/server/filestore_unix.go @@ -0,0 +1,33 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +package server + +import ( + "fmt" + "os" + "syscall" +) + +// Acquire filesystem-level lock for the filestore to ensure exclusive access +// for this server instance. +func lockFile(f *os.File) error { + err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX | syscall.LOCK_NB) + if err != nil { + return fmt.Errorf("lock `%s': %v", f.Name(), err) + } + return nil +} diff --git a/server/filestore_windows.go b/server/filestore_windows.go new file mode 100644 index 00000000000..4aef381ce1d --- /dev/null +++ b/server/filestore_windows.go @@ -0,0 +1,42 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows +// +build windows + +package server + +import ( + "fmt" + "os" + "syscall" + "unsafe" +) + +// Acquire filesystem-level lock for the filestore to ensure exclusive access +// for this server instance. +func lockFile(f *os.File) error { + var ( + modkernel32 = syscall.NewLazyDLL("kernel32.dll") + prLockFileEx = modkernel32.NewProc("LockFileEx") + ol = unsafe.Pointer(new(syscall.Overlapped)) + ) + + a := prLockFileEx.Addr() + h := syscall.Handle(f.Fd()) + r, _, e := syscall.Syscall6(a, 6, uintptr(h), 0x3, 0, ^uintptr(0), ^uintptr(0), uintptr(ol)) + if r == 0 { + return fmt.Errorf("lock `%s': %v", f.Name(), error(e)) + } + return nil +}