Skip to content

Commit

Permalink
feat: default retention period set to 7 days for rudder backups (#3038)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Mar 16, 2023
1 parent 61d9275 commit 0d9af35
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 10 deletions.
1 change: 1 addition & 0 deletions config/backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type DataRetention struct {
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
RetentionPeriod string `json:"retentionPeriod"`
}

type StorageBucket struct {
Expand Down
35 changes: 27 additions & 8 deletions services/fileuploader/fileuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type StorageSettings struct {
Preferences backendconfig.StoragePreferences
}

var noStorageForWorkspaceErrorString string = "no storage settings found for workspace: %s"

// Provider is an interface that provides file managers and storage preferences for a given workspace.
type Provider interface {
// Gets a file manager for the given workspace.
Expand Down Expand Up @@ -54,14 +56,25 @@ func NewDefaultProvider() Provider {
type provider struct {
onceInit sync.Once
init chan struct{}
mu sync.RWMutex
storageSettings map[string]StorageSettings
}

func (p *provider) GetFileManager(workspaceID string) (filemanager.FileManager, error) {
func (p *provider) getStorageSettings(workspaceID string) (StorageSettings, error) {
<-p.init
p.mu.RLock()
defer p.mu.RUnlock()
settings, ok := p.storageSettings[workspaceID]
if !ok {
return nil, fmt.Errorf("no storage settings found for workspace: %s", workspaceID)
return StorageSettings{}, fmt.Errorf(noStorageForWorkspaceErrorString, workspaceID)
}
return settings, nil
}

func (p *provider) GetFileManager(workspaceID string) (filemanager.FileManager, error) {
settings, err := p.getStorageSettings(workspaceID)
if err != nil {
return nil, err
}
return filemanager.DefaultFileManagerFactory.New(&filemanager.SettingsT{
Provider: settings.Bucket.Type,
Expand All @@ -70,11 +83,10 @@ func (p *provider) GetFileManager(workspaceID string) (filemanager.FileManager,
}

func (p *provider) GetStoragePreferences(workspaceID string) (backendconfig.StoragePreferences, error) {
<-p.init
var prefs backendconfig.StoragePreferences
settings, ok := p.storageSettings[workspaceID]
if !ok {
return prefs, fmt.Errorf("no storage settings found for workspace: %s", workspaceID)
settings, err := p.getStorageSettings(workspaceID)
if err != nil {
return prefs, err
}
return settings.Preferences, nil
}
Expand All @@ -83,9 +95,8 @@ func (p *provider) GetStoragePreferences(workspaceID string) (backendconfig.Stor
func (p *provider) updateLoop(ctx context.Context, backendConfig backendconfig.BackendConfig) {
ch := backendConfig.Subscribe(ctx, backendconfig.TopicBackendConfig)

settings := make(map[string]StorageSettings)

for ev := range ch {
settings := make(map[string]StorageSettings)
configs := ev.Data.(map[string]backendconfig.ConfigT)
for workspaceId, c := range configs {

Expand All @@ -98,6 +109,12 @@ func (p *provider) updateLoop(ctx context.Context, backendConfig backendconfig.B
bucket = overrideWithSettings(defaultBucket.Config, settings, workspaceId)
} else {
bucket = getDefaultBucket(ctx, config.GetString("JOBS_BACKUP_STORAGE_PROVIDER", "S3"))
switch c.Settings.DataRetention.RetentionPeriod {
case "default":
bucket.Config["prefix"] = config.GetString("JOBS_BACKUP_DEFAULT_PREFIX", "7dayretention")
case "full":
default:
}
}
// bucket type and configuration must not be empty
if bucket.Type != "" && len(bucket.Config) > 0 {
Expand All @@ -108,7 +125,9 @@ func (p *provider) updateLoop(ctx context.Context, backendConfig backendconfig.B
Preferences: preferences,
}
}
p.mu.Lock()
p.storageSettings = settings
p.mu.Unlock()
p.onceInit.Do(func() {
close(p.init)
})
Expand Down
41 changes: 39 additions & 2 deletions services/fileuploader/fileuploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fileuploader

import (
"context"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -54,6 +55,10 @@ func TestFileUploaderUpdatingWithConfigBackend(t *testing.T) {
ready.Wait()
Expect(preferences).To(BeEquivalentTo(backendconfig.StoragePreferences{}))

t.Setenv("JOBS_BACKUP_STORAGE_PROVIDER", "S3") // default rudder storage provider
t.Setenv("JOBS_BACKUP_DEFAULT_PREFIX", "defaultPrefixWithStorageTTL")
t.Setenv("JOBS_BACKUP_PREFIX", "fullStoragePrefixWithNoTTL")

// When user has not configured any storage
configCh <- pubsub.DataEvent{
Data: map[string]backendconfig.ConfigT{
Expand All @@ -62,11 +67,14 @@ func TestFileUploaderUpdatingWithConfigBackend(t *testing.T) {
Settings: backendconfig.Settings{
DataRetention: backendconfig.DataRetention{
UseSelfStorage: false,
StorageBucket: backendconfig.StorageBucket{},
StorageBucket: backendconfig.StorageBucket{
Type: "S3",
},
StoragePreferences: backendconfig.StoragePreferences{
ProcErrors: true,
GatewayDumps: false,
},
RetentionPeriod: "full",
},
},
},
Expand All @@ -86,10 +94,39 @@ func TestFileUploaderUpdatingWithConfigBackend(t *testing.T) {
},
},
},
"testWorkspaceId-3": {
WorkspaceID: "testWorkspaceId-3",
Settings: backendconfig.Settings{
DataRetention: backendconfig.DataRetention{
UseSelfStorage: false,
StorageBucket: backendconfig.StorageBucket{
Type: "S3",
Config: map[string]interface{}{},
},
StoragePreferences: backendconfig.StoragePreferences{
ProcErrors: false,
GatewayDumps: false,
},
RetentionPeriod: "default",
},
},
},
},
Topic: string(backendconfig.TopicBackendConfig),
}

fm1, err := fileUploaderProvider.GetFileManager("testWorkspaceId-1")
Expect(err).To(BeNil())
Expect(fm1.GetConfiguredPrefix()).To(Equal("fullStoragePrefixWithNoTTL"))

fm3, err := fileUploaderProvider.GetFileManager("testWorkspaceId-3")
Expect(err).To(BeNil())
Expect(fm3.GetConfiguredPrefix()).To(Equal("defaultPrefixWithStorageTTL"))

fm0, err := fileUploaderProvider.GetFileManager("testWorkspaceId-0")
Expect(err).To(Equal(fmt.Errorf(noStorageForWorkspaceErrorString, "testWorkspaceId-0")))
Expect(fm0).To(BeNil())

storageSettings.Wait()
Expect(preferences).To(Equal(
backendconfig.StoragePreferences{
Expand All @@ -99,7 +136,7 @@ func TestFileUploaderUpdatingWithConfigBackend(t *testing.T) {
))

preferences, err = fileUploaderProvider.GetStoragePreferences("testWorkspaceId-0")
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(fmt.Errorf(noStorageForWorkspaceErrorString, "testWorkspaceId-0")))
Expect(preferences).To(BeEquivalentTo(backendconfig.StoragePreferences{}))

preferences, err = fileUploaderProvider.GetStoragePreferences("testWorkspaceId-2")
Expand Down

0 comments on commit 0d9af35

Please sign in to comment.