Skip to content

Commit

Permalink
storagenode/blobstore/filestore: by default turn off fsync
Browse files Browse the repository at this point in the history
Change-Id: I4e3b46ef6248eebaa0b6ad1f9a5bbc71e286577d
  • Loading branch information
jtolio authored and zeebo committed May 6, 2024
1 parent ed819e9 commit 491c019
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 26 deletions.
6 changes: 4 additions & 2 deletions storagenode/blobstore/filestore/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ type blobWriter struct {
formatVersion blobstore.FormatVersion
buffer *bufio.Writer
fh *os.File
sync bool

track leak.Ref
}

func newBlobWriter(track leak.Ref, ref blobstore.BlobRef, store *blobStore, formatVersion blobstore.FormatVersion, file *os.File, bufferSize int) *blobWriter {
func newBlobWriter(track leak.Ref, ref blobstore.BlobRef, store *blobStore, formatVersion blobstore.FormatVersion, file *os.File, bufferSize int, sync bool) *blobWriter {
return &blobWriter{
ref: ref,
store: store,
closed: false,
formatVersion: formatVersion,
buffer: bufio.NewWriterSize(file, bufferSize),
fh: file,
sync: sync,

track: track,
}
Expand Down Expand Up @@ -133,7 +135,7 @@ func (blob *blobWriter) Commit(ctx context.Context) (err error) {
return err
}

err = blob.store.dir.Commit(ctx, blob.fh, blob.ref, blob.formatVersion)
err = blob.store.dir.Commit(ctx, blob.fh, blob.sync, blob.ref, blob.formatVersion)
return Error.Wrap(errs.Combine(err, blob.track.Close()))
}

Expand Down
55 changes: 39 additions & 16 deletions storagenode/blobstore/filestore/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.uber.org/zap"
"golang.org/x/exp/slices"

"storj.io/common/experiment"
"storj.io/common/storj"
"storj.io/storj/storagenode/blobstore"
)
Expand Down Expand Up @@ -186,6 +185,29 @@ func (dir *Dir) CreateTemporaryFile(ctx context.Context) (_ *os.File, err error)
return file, nil
}

// CreateNamedFile creates a preallocated file in the correct destination directory.
func (dir *Dir) CreateNamedFile(ctx context.Context, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) (file *os.File, err error) {
path, err := dir.blobToBasePath(ref)
if err != nil {
return nil, err
}
path = blobPathForFormatVersion(path, formatVersion)

file, err = os.Create(path)
if err != nil {
mkdirErr := os.MkdirAll(filepath.Dir(path), dirPermission)
if mkdirErr != nil {
return nil, Error.Wrap(errs.Combine(err, mkdirErr))
}
file, err = os.Create(path)
if err != nil {
return nil, err
}
}

return file, nil
}

// DeleteTemporary deletes a temporary file.
func (dir *Dir) DeleteTemporary(ctx context.Context, file *os.File) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -259,10 +281,10 @@ func blobPathForFormatVersion(path string, formatVersion blobstore.FormatVersion
}

// Commit commits the temporary file to permanent storage.
func (dir *Dir) Commit(ctx context.Context, file *os.File, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) (err error) {
func (dir *Dir) Commit(ctx context.Context, file *os.File, sync bool, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) (err error) {
defer mon.Task()(&ctx)(&err)
var syncErr error
if !experiment.Has(ctx, "nosync") {
if sync {
syncErr = file.Sync()
}

Expand All @@ -280,20 +302,21 @@ func (dir *Dir) Commit(ctx context.Context, file *os.File, ref blobstore.BlobRef
}
path = blobPathForFormatVersion(path, formatVersion)

mkdirErr := os.MkdirAll(filepath.Dir(path), dirPermission)
if os.IsExist(mkdirErr) {
mkdirErr = nil
}

if mkdirErr != nil {
removeErr := os.Remove(file.Name())
return errs.Combine(mkdirErr, removeErr)
}
if file.Name() != path {
mkdirErr := os.MkdirAll(filepath.Dir(path), dirPermission)
if os.IsExist(mkdirErr) {
mkdirErr = nil
}
if mkdirErr != nil {
removeErr := os.Remove(file.Name())
return errs.Combine(mkdirErr, removeErr)
}

renameErr := rename(file.Name(), path)
if renameErr != nil {
removeErr := os.Remove(file.Name())
return errs.Combine(renameErr, removeErr)
renameErr := rename(file.Name(), path)
if renameErr != nil {
removeErr := os.Remove(file.Name())
return errs.Combine(renameErr, removeErr)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion storagenode/blobstore/filestore/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func writeTestBlob(ctx context.Context, t *testing.T, dir *Dir, ref blobstore.Bl
require.NoError(t, err)
_, err = f.Write(contents)
require.NoError(t, err)
err = dir.Commit(ctx, f, ref, FormatV1)
err = dir.Commit(ctx, f, false, ref, FormatV1)
require.NoError(t, err)
}

Expand Down
13 changes: 10 additions & 3 deletions storagenode/blobstore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ func MonFileInTrash(namespace []byte) *monkit.Meter {
// Config is configuration for the blob store.
type Config struct {
WriteBufferSize memory.Size `help:"in-memory buffer for uploads" default:"128KiB"`
ForceSync bool `help:"if true, force disk synchronization and atomic writes" default:"false"`
}

// DefaultConfig is the default value for Config.
var DefaultConfig = Config{
WriteBufferSize: 128 * memory.KiB,
ForceSync: false,
}

// blobStore implements a blob store.
Expand Down Expand Up @@ -192,11 +194,16 @@ func (store *blobStore) EmptyTrash(ctx context.Context, namespace []byte, trashe
// Create creates a new blob that can be written.
func (store *blobStore) Create(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) {
defer mon.Task()(&ctx)(&err)
file, err := store.dir.CreateTemporaryFile(ctx)
var file *os.File
if store.config.ForceSync {
file, err = store.dir.CreateTemporaryFile(ctx)
} else {
file, err = store.dir.CreateNamedFile(ctx, ref, MaxFormatVersionSupported)
}
if err != nil {
return nil, Error.Wrap(err)
}
return newBlobWriter(store.track.Child("blobWriter", 1), ref, store, MaxFormatVersionSupported, file, store.config.WriteBufferSize.Int()), nil
return newBlobWriter(store.track.Child("blobWriter", 1), ref, store, MaxFormatVersionSupported, file, store.config.WriteBufferSize.Int(), store.config.ForceSync), nil
}

// SpaceUsedForBlobs adds up the space used in all namespaces for blob storage.
Expand Down Expand Up @@ -333,7 +340,7 @@ func (store *blobStore) TestCreateV0(ctx context.Context, ref blobstore.BlobRef)
if err != nil {
return nil, Error.Wrap(err)
}
return newBlobWriter(store.track.Child("blobWriter", 1), ref, store, FormatV0, file, store.config.WriteBufferSize.Int()), nil
return newBlobWriter(store.track.Child("blobWriter", 1), ref, store, FormatV0, file, store.config.WriteBufferSize.Int(), store.config.ForceSync), nil
}

// CreateVerificationFile creates a file to be used for storage directory verification.
Expand Down
4 changes: 0 additions & 4 deletions storagenode/blobstore/filestore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ func TestDeleteWhileReading(t *testing.T) {
_, err = writer.Write(data)
require.NoError(t, err)

// loading uncommitted file should fail
_, err = store.Open(ctx, ref)
require.Error(t, err, "loading uncommitted file should fail")

// commit the file
err = writer.Commit(ctx)
require.NoError(t, err, "commit the file")
Expand Down
1 change: 1 addition & 0 deletions storagenode/pieces/lazyfilewalker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (config *Config) Args() []string {
"--pieces", config.Pieces,
"--driver", config.Driver,
"--filestore.write-buffer-size", config.Filestore.WriteBufferSize.String(),
fmt.Sprintf("--filestore.force-sync=%v", config.Filestore.ForceSync),
// set log output to stderr, so it doesn't interfere with the output of the command
"--log.output", "stderr",
// use the json formatter in the subprocess, so we could read lines and re-log them in the main process
Expand Down

1 comment on commit 491c019

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/huge-amount-of-trash-data/26168/213

Please sign in to comment.