From 3ee13b6952e32733b18a2c3aff56d7b3e5bc91b1 Mon Sep 17 00:00:00 2001 From: Nathan Burke Date: Fri, 13 Oct 2023 09:38:39 -0600 Subject: [PATCH] [extension/storage/filestorage] Add bbolt FSync as a config option (#27459) Description: Exposes bbolt fsync as a configuration option Link to tracking Issue: [20266](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20266) Testing: Manual Testing, Updated unit tests for factory and client Documentation: Added change-log and documentation comments in config.go --------- Co-authored-by: Daniel Jaglowski --- .chloggen/feat_fsync_option_filestorage.yaml | 18 ++++++++++ extension/storage/filestorage/README.md | 3 ++ extension/storage/filestorage/client.go | 10 +++--- extension/storage/filestorage/client_test.go | 36 +++++++++---------- extension/storage/filestorage/config.go | 3 ++ extension/storage/filestorage/config_test.go | 1 + extension/storage/filestorage/extension.go | 2 +- extension/storage/filestorage/factory.go | 1 + extension/storage/filestorage/factory_test.go | 1 + .../storage/filestorage/testdata/config.yaml | 1 + 10 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 .chloggen/feat_fsync_option_filestorage.yaml diff --git a/.chloggen/feat_fsync_option_filestorage.yaml b/.chloggen/feat_fsync_option_filestorage.yaml new file mode 100644 index 0000000000000..34b3dbed80d99 --- /dev/null +++ b/.chloggen/feat_fsync_option_filestorage.yaml @@ -0,0 +1,18 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'entension/storage/filestorage' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: 'Add support for setting bbolt fsync option' + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [20266] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/extension/storage/filestorage/README.md b/extension/storage/filestorage/README.md index 6c432438c4d66..24d72e93b1936 100644 --- a/extension/storage/filestorage/README.md +++ b/extension/storage/filestorage/README.md @@ -25,6 +25,8 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va `timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances. The default timeout is `1s`. +`fsync` when set, will force the database to perform an fsync after each write. This helps to ensure database integretity if there is an interruption to the database process, but at the cost of performance. See [DB.NoSync](https://pkg.go.dev/go.etcd.io/bbolt#DB) for more information. + ## Compaction `compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently): - `compaction.on_start` (default: false), which happens when collector starts @@ -78,6 +80,7 @@ extensions: on_start: true directory: /tmp/ max_transaction_size: 65_536 + fsync: false service: extensions: [file_storage, file_storage/all_settings] diff --git a/extension/storage/filestorage/client.go b/extension/storage/filestorage/client.go index 24a1776cf38f1..a2819a5f2988f 100644 --- a/extension/storage/filestorage/client.go +++ b/extension/storage/filestorage/client.go @@ -37,17 +37,17 @@ type fileStorageClient struct { closed bool } -func bboltOptions(timeout time.Duration) *bbolt.Options { +func bboltOptions(timeout time.Duration, fSync bool) *bbolt.Options { return &bbolt.Options{ Timeout: timeout, - NoSync: true, + NoSync: !fSync, NoFreelistSync: true, FreelistType: bbolt.FreelistMapType, } } -func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) { - options := bboltOptions(timeout) +func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, fSync bool) (*fileStorageClient, error) { + options := bboltOptions(timeout, fSync) db, err := bbolt.Open(filePath, 0600, options) if err != nil { return nil, err @@ -172,7 +172,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur }() // use temporary file as compaction target - options := bboltOptions(timeout) + options := bboltOptions(timeout, false) c.compactionMutex.Lock() defer c.compactionMutex.Unlock() diff --git a/extension/storage/filestorage/client_test.go b/extension/storage/filestorage/client_test.go index 157b6eb3c5330..8717f553b90ed 100644 --- a/extension/storage/filestorage/client_test.go +++ b/extension/storage/filestorage/client_test.go @@ -21,7 +21,7 @@ import ( func TestClientOperations(t *testing.T) { dbFile := filepath.Join(t.TempDir(), "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -59,7 +59,7 @@ func TestClientBatchOperations(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -180,7 +180,7 @@ func TestNewClientTransactionErrors(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -204,7 +204,7 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.Error(t, err) require.Nil(t, client) @@ -259,7 +259,7 @@ func TestClientReboundCompaction(t *testing.T) { CheckInterval: checkInterval, ReboundNeededThresholdMiB: testCase.reboundNeededThresholdMiB, ReboundTriggerThresholdMiB: testCase.reboundTriggerThresholdMiB, - }) + }, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -348,7 +348,7 @@ func TestClientConcurrentCompaction(t *testing.T) { CheckInterval: stepInterval * 2, ReboundNeededThresholdMiB: 1, ReboundTriggerThresholdMiB: 5, - }) + }, false) require.NoError(t, err) t.Cleanup(func() { @@ -408,7 +408,7 @@ func BenchmarkClientGet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -428,7 +428,7 @@ func BenchmarkClientGet100(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -451,7 +451,7 @@ func BenchmarkClientSet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -471,7 +471,7 @@ func BenchmarkClientSet100(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -493,7 +493,7 @@ func BenchmarkClientDelete(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -519,7 +519,7 @@ func BenchmarkClientSetLargeDB(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -556,7 +556,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -575,7 +575,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) { var tempClient *fileStorageClient b.ResetTimer() for n := 0; n < b.N; n++ { - tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.StopTimer() err = tempClient.Close(ctx) @@ -593,7 +593,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -620,7 +620,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) { testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n)) err = os.Link(dbFile, testDbFile) require.NoError(b, err) - client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}) + client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.StartTimer() require.NoError(b, client.Compact(tempDir, time.Second, 65536)) @@ -637,7 +637,7 @@ func BenchmarkClientCompactDb(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -664,7 +664,7 @@ func BenchmarkClientCompactDb(b *testing.B) { testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n)) err = os.Link(dbFile, testDbFile) require.NoError(b, err) - client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}) + client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.StartTimer() require.NoError(b, client.Compact(tempDir, time.Second, 65536)) diff --git a/extension/storage/filestorage/config.go b/extension/storage/filestorage/config.go index fcc866d8226d6..d71bbe0234fc7 100644 --- a/extension/storage/filestorage/config.go +++ b/extension/storage/filestorage/config.go @@ -17,6 +17,9 @@ type Config struct { Timeout time.Duration `mapstructure:"timeout,omitempty"` Compaction *CompactionConfig `mapstructure:"compaction,omitempty"` + + // FSync specifies that fsync should be called after each database write + FSync bool `mapstructure:"fsync,omitempty"` } // CompactionConfig defines configuration for optional file storage compaction. diff --git a/extension/storage/filestorage/config_test.go b/extension/storage/filestorage/config_test.go index 73f3be89f9875..11d898a55f179 100644 --- a/extension/storage/filestorage/config_test.go +++ b/extension/storage/filestorage/config_test.go @@ -47,6 +47,7 @@ func TestLoadConfig(t *testing.T) { CheckInterval: time.Second * 5, }, Timeout: 2 * time.Second, + FSync: true, }, }, } diff --git a/extension/storage/filestorage/extension.go b/extension/storage/filestorage/extension.go index 34ff703796709..fbac0e1ee7a60 100644 --- a/extension/storage/filestorage/extension.go +++ b/extension/storage/filestorage/extension.go @@ -64,7 +64,7 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e rawName = sanitize(rawName) } absoluteName := filepath.Join(lfs.cfg.Directory, rawName) - client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction) + client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, lfs.cfg.FSync) if err != nil { return nil, err diff --git a/extension/storage/filestorage/factory.go b/extension/storage/filestorage/factory.go index 6d7f0355cfecc..ef3e04e9d3c7c 100644 --- a/extension/storage/filestorage/factory.go +++ b/extension/storage/filestorage/factory.go @@ -47,6 +47,7 @@ func createDefaultConfig() component.Config { CheckInterval: defaultCompactionInterval, }, Timeout: time.Second, + FSync: false, } } diff --git a/extension/storage/filestorage/factory_test.go b/extension/storage/filestorage/factory_test.go index dc77d1a59c355..c022d76ba9218 100644 --- a/extension/storage/filestorage/factory_test.go +++ b/extension/storage/filestorage/factory_test.go @@ -29,6 +29,7 @@ func TestFactory(t *testing.T) { require.Equal(t, expected, cfg.Directory) } require.Equal(t, time.Second, cfg.Timeout) + require.Equal(t, false, cfg.FSync) tests := []struct { name string diff --git a/extension/storage/filestorage/testdata/config.yaml b/extension/storage/filestorage/testdata/config.yaml index dad1ef9bb50e5..4a923aee71fe7 100644 --- a/extension/storage/filestorage/testdata/config.yaml +++ b/extension/storage/filestorage/testdata/config.yaml @@ -13,3 +13,4 @@ file_storage/all_settings: rebound_needed_threshold_mib: 128 max_transaction_size: 2048 timeout: 2s + fsync: true