Skip to content

Commit

Permalink
[extension/storage/filestorage] Add bbolt FSync as a config option (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#27459)

Description: Exposes bbolt fsync as a configuration option

Link to tracking Issue:
[20266](open-telemetry#20266)

Testing: Manual Testing, Updated unit tests for factory and client

Documentation: Added change-log and documentation comments in config.go

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
2 people authored and JaredTan95 committed Oct 18, 2023
1 parent ca28b62 commit 3ee13b6
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 24 deletions.
18 changes: 18 additions & 0 deletions .chloggen/feat_fsync_option_filestorage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'entension/storage/filestorage'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Add support for setting bbolt fsync option'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20266]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
3 changes: 3 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va
`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.
The default timeout is `1s`.

`fsync` when set, will force the database to perform an fsync after each write. This helps to ensure database integretity if there is an interruption to the database process, but at the cost of performance. See [DB.NoSync](https://pkg.go.dev/go.etcd.io/bbolt#DB) for more information.

## Compaction
`compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently):
- `compaction.on_start` (default: false), which happens when collector starts
Expand Down Expand Up @@ -78,6 +80,7 @@ extensions:
on_start: true
directory: /tmp/
max_transaction_size: 65_536
fsync: false
service:
extensions: [file_storage, file_storage/all_settings]
Expand Down
10 changes: 5 additions & 5 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ type fileStorageClient struct {
closed bool
}

func bboltOptions(timeout time.Duration) *bbolt.Options {
func bboltOptions(timeout time.Duration, fSync bool) *bbolt.Options {
return &bbolt.Options{
Timeout: timeout,
NoSync: true,
NoSync: !fSync,
NoFreelistSync: true,
FreelistType: bbolt.FreelistMapType,
}
}

func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) {
options := bboltOptions(timeout)
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, fSync bool) (*fileStorageClient, error) {
options := bboltOptions(timeout, fSync)
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
}()

// use temporary file as compaction target
options := bboltOptions(timeout)
options := bboltOptions(timeout, false)

c.compactionMutex.Lock()
defer c.compactionMutex.Unlock()
Expand Down
36 changes: 18 additions & 18 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func TestClientOperations(t *testing.T) {
dbFile := filepath.Join(t.TempDir(), "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestClientBatchOperations(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestNewClientTransactionErrors(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand All @@ -204,7 +204,7 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.Error(t, err)
require.Nil(t, client)

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestClientReboundCompaction(t *testing.T) {
CheckInterval: checkInterval,
ReboundNeededThresholdMiB: testCase.reboundNeededThresholdMiB,
ReboundTriggerThresholdMiB: testCase.reboundTriggerThresholdMiB,
})
}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestClientConcurrentCompaction(t *testing.T) {
CheckInterval: stepInterval * 2,
ReboundNeededThresholdMiB: 1,
ReboundTriggerThresholdMiB: 5,
})
}, false)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -408,7 +408,7 @@ func BenchmarkClientGet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -428,7 +428,7 @@ func BenchmarkClientGet100(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -451,7 +451,7 @@ func BenchmarkClientSet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -471,7 +471,7 @@ func BenchmarkClientSet100(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -493,7 +493,7 @@ func BenchmarkClientDelete(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -519,7 +519,7 @@ func BenchmarkClientSetLargeDB(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand Down Expand Up @@ -556,7 +556,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -575,7 +575,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
var tempClient *fileStorageClient
b.ResetTimer()
for n := 0; n < b.N; n++ {
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StopTimer()
err = tempClient.Close(ctx)
Expand All @@ -593,7 +593,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -620,7 +620,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
err = os.Link(dbFile, testDbFile)
require.NoError(b, err)
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StartTimer()
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
Expand All @@ -637,7 +637,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -664,7 +664,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
err = os.Link(dbFile, testDbFile)
require.NoError(b, err)
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StartTimer()
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Config struct {
Timeout time.Duration `mapstructure:"timeout,omitempty"`

Compaction *CompactionConfig `mapstructure:"compaction,omitempty"`

// FSync specifies that fsync should be called after each database write
FSync bool `mapstructure:"fsync,omitempty"`
}

// CompactionConfig defines configuration for optional file storage compaction.
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestLoadConfig(t *testing.T) {
CheckInterval: time.Second * 5,
},
Timeout: 2 * time.Second,
FSync: true,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion extension/storage/filestorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e
rawName = sanitize(rawName)
}
absoluteName := filepath.Join(lfs.cfg.Directory, rawName)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, lfs.cfg.FSync)

if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func createDefaultConfig() component.Config {
CheckInterval: defaultCompactionInterval,
},
Timeout: time.Second,
FSync: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFactory(t *testing.T) {
require.Equal(t, expected, cfg.Directory)
}
require.Equal(t, time.Second, cfg.Timeout)
require.Equal(t, false, cfg.FSync)

tests := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ file_storage/all_settings:
rebound_needed_threshold_mib: 128
max_transaction_size: 2048
timeout: 2s
fsync: true

0 comments on commit 3ee13b6

Please sign in to comment.