Skip to content

Commit

Permalink
chore: backup support for new multitenant system (#2549)
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC committed Nov 4, 2022
1 parent 00ba231 commit 4741989
Show file tree
Hide file tree
Showing 24 changed files with 1,237 additions and 206 deletions.
12 changes: 10 additions & 2 deletions app/apphandlers/embeddedAppHandler.go
Expand Up @@ -25,6 +25,7 @@ import (
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/multitenant"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -77,6 +78,9 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
prebackupHandlers := []prebackup.Handler{
prebackup.DropSourceIds(transientSources.SourceIdsSupplier()),
}

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
Expand All @@ -90,6 +94,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -98,6 +103,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -106,6 +112,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -114,6 +121,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)

var tenantRouterDB jobsdb.MultiTenantJobsDB
Expand Down Expand Up @@ -150,7 +158,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
return fmt.Errorf("unsupported deployment type: %q", deploymentType)
}

proc := processor.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, rsourcesService)
proc := processor.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, fileUploaderProvider, rsourcesService)
rtFactory := &router.Factory{
Reporting: reportingI,
Multitenant: multitenantStats,
Expand Down Expand Up @@ -223,7 +231,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
var replayDB jobsdb.HandleT
err := replayDB.Setup(
jobsdb.ReadWrite, options.ClearDB, "replay",
true, prebackupHandlers,
true, prebackupHandlers, fileUploaderProvider,
)
if err != nil {
return fmt.Errorf("could not setup replayDB: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions app/apphandlers/gatewayAppHandler.go
Expand Up @@ -16,6 +16,7 @@ import (
ratelimiter "github.com/rudderlabs/rudder-server/rate-limiter"
"github.com/rudderlabs/rudder-server/services/db"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
Expand Down Expand Up @@ -48,11 +49,14 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.

sourcedebugger.Setup(backendconfig.DefaultBackendConfig)

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gatewayDB.Close()
if err := gatewayDB.Start(); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion app/apphandlers/processorAppHandler.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rudderlabs/rudder-server/services/db"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/multitenant"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -110,6 +111,9 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
prebackupHandlers := []prebackup.Handler{
prebackup.DropSourceIds(transientSources.SourceIdsSupplier()),
}

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
Expand All @@ -121,6 +125,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gwDBForProcessor.Close()
gatewayDB = gwDBForProcessor
Expand All @@ -130,6 +135,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -138,6 +144,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -146,6 +153,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
var tenantRouterDB jobsdb.MultiTenantJobsDB
var multitenantStats multitenant.MultiTenantI
Expand Down Expand Up @@ -181,7 +189,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
return fmt.Errorf("unsupported deployment type: %q", deploymentType)
}

p := proc.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, rsourcesService)
p := proc.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, fileUploaderProvider, rsourcesService)

rtFactory := &router.Factory{
Reporting: reportingI,
Expand Down
3 changes: 2 additions & 1 deletion app/cluster/integration_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/enterprise/reporting"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transientsource"

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestDynamicClusterManager(t *testing.T) {
"batch_rt": &jobsdb.MultiTenantLegacy{HandleT: brtDB},
})

processor := processor.New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mockMTI, &reporting.NOOP{}, transientsource.NewEmptyService(), rsources.NewNoOpService())
processor := processor.New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mockMTI, &reporting.NOOP{}, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), rsources.NewNoOpService())
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
Expand Down
39 changes: 39 additions & 0 deletions config/backend-config/types.go
Expand Up @@ -88,6 +88,45 @@ type ConfigT struct {
Sources []SourceT `json:"sources"`
Libraries LibrariesT `json:"libraries"`
ConnectionFlags ConnectionFlags `json:"flags"`
Settings Settings `json:"settings"`
}

type Settings struct {
DataRetention DataRetention `json:"dataRetention"`
}

type DataRetention struct {
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
}

type StorageBucket struct {
Type string `json:"type"`
Config map[string]interface{}
}

type StoragePreferences struct {
ProcErrors bool `json:"procErrors"`
GatewayDumps bool `json:"gatewayDumps"`
ProcErrorDumps bool `json:"procErrorDumps"`
RouterDumps bool `json:"routerDumps"`
BatchRouterDumps bool `json:"batchRouterDumps"`
}

func (sp StoragePreferences) Backup(tableprefix string) bool {
switch tableprefix {
case "gw":
return sp.GatewayDumps
case "rt":
return sp.RouterDumps
case "batch_rt":
return sp.BatchRouterDumps
case "proc_error":
return sp.ProcErrorDumps
default:
return false
}
}

type ConnectionFlags struct {
Expand Down
6 changes: 6 additions & 0 deletions enterprise/reporting/setup_test.go
Expand Up @@ -26,4 +26,10 @@ func TestFeatureSetup(t *testing.T) {
require.Equal(t, instanceA, instanceB)
require.Equal(t, instanceB, instanceC)
require.Equal(t, instanceC, instanceD)

f = &Factory{}
instanceE := f.Setup(&backendconfig.NOOP{})
instanceF := f.GetReportingInstance()
require.Equal(t, instanceE, instanceF)
require.NotEqual(t, instanceE, backendconfig.NOOP{})
}

0 comments on commit 4741989

Please sign in to comment.